Интеграция Databricks для ML на больших данных
Databricks — managed Spark с добавленным слоем: Unity Catalog для governance, MLflow как встроенный трекер экспериментов, Feature Store для управления признаками и AutoML для быстрого прототипирования. Разница с ванильным Spark: меньше инфраструктурного overhead, больше ML-tooling из коробки.
Настройка рабочего пространства
# Databricks SDK — управление кластерами и заданиями
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterSpec, AutoScale
w = WorkspaceClient(
host="https://your-workspace.azuredatabricks.net",
token="dapi..."
)
# Создание ML-кластера с GPU
cluster = w.clusters.create(
cluster_name="ml-training-cluster",
spark_version="14.3.x-ml-gpu-scala2.12",
node_type_id="Standard_NC6s_v3", # 1x V100 16GB
autoscale=AutoScale(min_workers=2, max_workers=8),
spark_conf={
"spark.databricks.delta.preview.enabled": "true",
"spark.sql.adaptive.enabled": "true",
},
custom_tags={"team": "ml", "env": "production"},
data_security_mode="SINGLE_USER"
)
Delta Lake + Feature Store
from databricks.feature_store import FeatureStoreClient
from databricks.feature_store.entities.feature_lookup import FeatureLookup
import pyspark.sql.functions as F
fs = FeatureStoreClient()
# Создание feature table в Delta Lake
def compute_user_features(df):
return df.groupBy("user_id").agg(
F.count("transaction_id").alias("tx_count_30d"),
F.sum("amount").alias("tx_amount_30d"),
F.avg("amount").alias("tx_avg_amount"),
F.stddev("amount").alias("tx_std_amount"),
F.countDistinct("merchant_category").alias("unique_categories"),
F.max("timestamp").alias("last_transaction_ts")
)
user_features_df = compute_user_features(
spark.table("transactions").filter("date >= current_date() - 30")
)
# Регистрация в Feature Store
fs.create_table(
name="ml_catalog.features.user_transaction_features",
primary_keys=["user_id"],
df=user_features_df,
description="User transaction features, 30-day rolling window"
)
# Обновление (инкрементальное через Delta Merge)
fs.write_table(
name="ml_catalog.features.user_transaction_features",
df=user_features_df,
mode="merge"
)
AutoML для быстрого прототипирования
from databricks import automl
from datetime import datetime
# Автоматический поиск лучшей модели
summary = automl.classify(
dataset=spark.table("ml_catalog.training.fraud_labels"),
target_col="is_fraud",
data_dir="dbfs:/automl/fraud_detection",
timeout_minutes=60,
experiment_dir="/Users/mlteam/experiments",
primary_metric="f1"
)
print(f"Best model: {summary.best_trial.model_description}")
print(f"Best F1: {summary.best_trial.evaluation_metric_score:.4f}")
# Ноутбук с лучшей моделью
print(f"Explore: {summary.best_trial.notebook_url}")
MLflow эксперименты и регистр моделей
import mlflow
import mlflow.pyfunc
from mlflow.models.signature import infer_signature
mlflow.set_registry_uri("databricks")
mlflow.set_experiment("/ML/fraud_detection")
with mlflow.start_run(run_name=f"gbm_{datetime.now():%Y%m%d_%H%M}") as run:
# Feature Lookups — получение признаков из Feature Store
feature_lookups = [
FeatureLookup(
table_name="ml_catalog.features.user_transaction_features",
feature_names=["tx_count_30d", "tx_amount_30d", "tx_avg_amount"],
lookup_key="user_id"
),
FeatureLookup(
table_name="ml_catalog.features.merchant_features",
feature_names=["merchant_risk_score", "merchant_age_days"],
lookup_key="merchant_id"
)
]
training_set = fs.create_training_set(
df=spark.table("ml_catalog.training.fraud_labels"),
feature_lookups=feature_lookups,
label="is_fraud",
exclude_columns=["timestamp"]
)
training_df = training_set.load_df().toPandas()
from lightgbm import LGBMClassifier
from sklearn.model_selection import cross_val_score
model = LGBMClassifier(n_estimators=300, learning_rate=0.05, random_state=42)
cv_auc = cross_val_score(model, training_df.drop("is_fraud", axis=1),
training_df["is_fraud"], cv=5, scoring="roc_auc")
mlflow.log_params(model.get_params())
mlflow.log_metric("cv_auc_mean", cv_auc.mean())
mlflow.log_metric("cv_auc_std", cv_auc.std())
model.fit(training_df.drop("is_fraud", axis=1), training_df["is_fraud"])
# Логирование вместе с Feature Store для автоматического lookup
fs.log_model(
model=model,
artifact_path="model",
flavor=mlflow.lightgbm,
training_set=training_set,
registered_model_name="fraud_detection_model"
)
print(f"Run ID: {run.info.run_id}")
Инференс через Model Serving
# Включение Model Serving через API
import requests
def deploy_model(model_name: str, model_version: int, workspace_url: str, token: str):
"""Деплой модели в Databricks Model Serving"""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
# Создание endpoint
endpoint_config = {
"name": f"{model_name}_endpoint",
"config": {
"served_entities": [{
"name": "primary",
"entity_name": model_name,
"entity_version": str(model_version),
"workload_size": "Small", # Small / Medium / Large
"scale_to_zero_enabled": True
}],
"traffic_config": {
"routes": [{"served_model_name": "primary", "traffic_percentage": 100}]
}
}
}
response = requests.post(
f"{workspace_url}/api/2.0/serving-endpoints",
headers=headers,
json=endpoint_config
)
return response.json()
def batch_inference_job(model_name: str, input_table: str, output_table: str):
"""Запуск batch inference через Databricks Job"""
# Scoring batch через Feature Store
predictions = fs.score_batch(
f"models:/{model_name}/Production",
spark.table(input_table)
)
predictions.write.mode("overwrite").saveAsTable(output_table)
Сравнение с self-managed Spark
| Аспект | Databricks | Self-managed Spark |
|---|---|---|
| Setup time | 30 минут | 1-2 недели |
| Cluster autoscaling | Авто | Ручная конфигурация |
| MLflow | Встроен | Отдельная установка |
| Delta Lake | Нативно | Отдельная конфигурация |
| Feature Store | Встроен | Feast / Tecton |
| Стоимость | +20-30% к EC2 | EC2 стоимость |
| GPU поддержка | Нативно | NVIDIA plugin |
Оптимальный выбор Databricks: команды > 5 ML-инженеров, > 3 активных проектов, облачный деплой (AWS/Azure/GCP). ROI: экономия 2-4 месяцев разработки инфраструктуры на старте.







