Интеграция Apache Spark MLlib для больших данных
Spark MLlib используется когда данные не помещаются в RAM одной машины, а значит pandas и sklearn не работают. Типичный порог: датасеты > 100GB или > 100M строк. MLlib предоставляет те же алгоритмы (логистическая регрессия, градиентный бустинг, k-means), но распределённо на кластере.
Настройка Spark ML пайплайна
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (VectorAssembler, StringIndexer,
StandardScaler, Imputer)
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
spark = SparkSession.builder \
.appName("ML Pipeline") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "10") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.ml.param.maxParallelism", "4") \
.getOrCreate()
# Загрузка данных
df = spark.read.parquet("s3://data/training/*.parquet")
df = df.repartition(200) # Оптимальное число партиций
# Feature engineering
numeric_cols = ['amount', 'age', 'days_since_last_tx', 'tx_count_30d']
categorical_cols = ['category', 'country', 'device_type']
# Imputer для числовых
imputer = Imputer(
inputCols=numeric_cols,
outputCols=[f"{c}_imputed" for c in numeric_cols],
strategy="median"
)
# Кодирование категориальных
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_idx",
handleInvalid="keep")
for col in categorical_cols
]
# Сборка вектора признаков
all_feature_cols = (
[f"{c}_imputed" for c in numeric_cols] +
[f"{c}_idx" for c in categorical_cols]
)
assembler = VectorAssembler(
inputCols=all_feature_cols,
outputCol="features_raw",
handleInvalid="keep"
)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withMean=True,
withStd=True
)
# Модель
gbt = GBTClassifier(
labelCol="label",
featuresCol="features",
maxIter=100,
maxDepth=5,
stepSize=0.05,
subsamplingRate=0.8,
seed=42
)
# Pipeline
pipeline = Pipeline(stages=[
imputer,
*indexers,
assembler,
scaler,
gbt
])
# Train/test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Обучение
model = pipeline.fit(train_df)
predictions = model.transform(test_df)
# Оценка
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")
Гиперпараметрическая оптимизация
# Cross-validation на кластере
param_grid = ParamGridBuilder() \
.addGrid(gbt.maxDepth, [4, 6, 8]) \
.addGrid(gbt.maxIter, [50, 100]) \
.addGrid(gbt.stepSize, [0.05, 0.1]) \
.build()
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
parallelism=4, # Параллельный запуск фолдов
seed=42
)
cv_model = cv.fit(train_df)
best_model = cv_model.bestModel
print(f"Best params: {cv_model.bestModel.stages[-1].extractParamMap()}")
Feature Importance и интерпретация
# Извлечение feature importance
gbt_model = best_model.stages[-1]
importance = gbt_model.featureImportances
# Маппинг на имена признаков
feature_names = all_feature_cols
importance_df = spark.createDataFrame(
[(name, float(imp)) for name, imp in zip(feature_names, importance.toArray())],
["feature", "importance"]
).orderBy("importance", ascending=False)
importance_df.show(20)
# SHAP через pandas на выборке
sample_pandas = predictions.sample(fraction=0.01).toPandas()
# ... далее стандартный TreeExplainer
Сохранение и деплой модели
import mlflow
import mlflow.spark
# Логирование в MLflow
with mlflow.start_run():
mlflow.log_param("max_depth", gbt.getMaxDepth())
mlflow.log_param("max_iter", gbt.getMaxIter())
mlflow.log_metric("auc", auc)
# Сохранение Spark модели
mlflow.spark.log_model(best_model, "spark_model")
# Экспорт в ONNX для быстрого инференса
from onnxmltools import convert_sparkml
onnx_model = convert_sparkml(best_model, "GBT Model", test_df.limit(5))
mlflow.onnx.log_model(onnx_model, "onnx_model")
# Загрузка для предсказаний
loaded_model = mlflow.spark.load_model("runs:/RUN_ID/spark_model")
batch_predictions = loaded_model.transform(new_data_df)
Оптимизация производительности
| Параметр | Default | Рекомендованное | Эффект |
|---|---|---|---|
| spark.sql.shuffle.partitions | 200 | 2x cores | Избежать skew |
| executor.memory | 1g | 4-8g | Кэш датасета |
| spark.ml.param.maxParallelism | 1 | 4-8 | CV параллелизм |
| repartition перед fit | нет | 200-400 | Равномерная нагрузка |
| caching train_df | нет | да | 3-5x ускорение CV |
Обучение GBT на 500M строк в типичной конфигурации (10 executor × 4 CPU × 8GB): 15-40 минут в зависимости от числа признаков и итераций. Cross-validation с 6 конфигурациями × 3 фолда: 4-6 часов без параллелизма, 1-2 часа с parallelism=4.







