Интеграция Databricks для ML на больших данных

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Интеграция Databricks для ML на больших данных
Средний
~1-2 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Интеграция Databricks для ML на больших данных

Databricks — managed Spark с добавленным слоем: Unity Catalog для governance, MLflow как встроенный трекер экспериментов, Feature Store для управления признаками и AutoML для быстрого прототипирования. Разница с ванильным Spark: меньше инфраструктурного overhead, больше ML-tooling из коробки.

Настройка рабочего пространства

# Databricks SDK — управление кластерами и заданиями
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterSpec, AutoScale

w = WorkspaceClient(
    host="https://your-workspace.azuredatabricks.net",
    token="dapi..."
)

# Создание ML-кластера с GPU
cluster = w.clusters.create(
    cluster_name="ml-training-cluster",
    spark_version="14.3.x-ml-gpu-scala2.12",
    node_type_id="Standard_NC6s_v3",  # 1x V100 16GB
    autoscale=AutoScale(min_workers=2, max_workers=8),
    spark_conf={
        "spark.databricks.delta.preview.enabled": "true",
        "spark.sql.adaptive.enabled": "true",
    },
    custom_tags={"team": "ml", "env": "production"},
    data_security_mode="SINGLE_USER"
)

Delta Lake + Feature Store

from databricks.feature_store import FeatureStoreClient
from databricks.feature_store.entities.feature_lookup import FeatureLookup
import pyspark.sql.functions as F

fs = FeatureStoreClient()

# Создание feature table в Delta Lake
def compute_user_features(df):
    return df.groupBy("user_id").agg(
        F.count("transaction_id").alias("tx_count_30d"),
        F.sum("amount").alias("tx_amount_30d"),
        F.avg("amount").alias("tx_avg_amount"),
        F.stddev("amount").alias("tx_std_amount"),
        F.countDistinct("merchant_category").alias("unique_categories"),
        F.max("timestamp").alias("last_transaction_ts")
    )

user_features_df = compute_user_features(
    spark.table("transactions").filter("date >= current_date() - 30")
)

# Регистрация в Feature Store
fs.create_table(
    name="ml_catalog.features.user_transaction_features",
    primary_keys=["user_id"],
    df=user_features_df,
    description="User transaction features, 30-day rolling window"
)

# Обновление (инкрементальное через Delta Merge)
fs.write_table(
    name="ml_catalog.features.user_transaction_features",
    df=user_features_df,
    mode="merge"
)

AutoML для быстрого прототипирования

from databricks import automl
from datetime import datetime

# Автоматический поиск лучшей модели
summary = automl.classify(
    dataset=spark.table("ml_catalog.training.fraud_labels"),
    target_col="is_fraud",
    data_dir="dbfs:/automl/fraud_detection",
    timeout_minutes=60,
    experiment_dir="/Users/mlteam/experiments",
    primary_metric="f1"
)

print(f"Best model: {summary.best_trial.model_description}")
print(f"Best F1: {summary.best_trial.evaluation_metric_score:.4f}")

# Ноутбук с лучшей моделью
print(f"Explore: {summary.best_trial.notebook_url}")

MLflow эксперименты и регистр моделей

import mlflow
import mlflow.pyfunc
from mlflow.models.signature import infer_signature

mlflow.set_registry_uri("databricks")
mlflow.set_experiment("/ML/fraud_detection")

with mlflow.start_run(run_name=f"gbm_{datetime.now():%Y%m%d_%H%M}") as run:
    # Feature Lookups — получение признаков из Feature Store
    feature_lookups = [
        FeatureLookup(
            table_name="ml_catalog.features.user_transaction_features",
            feature_names=["tx_count_30d", "tx_amount_30d", "tx_avg_amount"],
            lookup_key="user_id"
        ),
        FeatureLookup(
            table_name="ml_catalog.features.merchant_features",
            feature_names=["merchant_risk_score", "merchant_age_days"],
            lookup_key="merchant_id"
        )
    ]

    training_set = fs.create_training_set(
        df=spark.table("ml_catalog.training.fraud_labels"),
        feature_lookups=feature_lookups,
        label="is_fraud",
        exclude_columns=["timestamp"]
    )

    training_df = training_set.load_df().toPandas()

    from lightgbm import LGBMClassifier
    from sklearn.model_selection import cross_val_score

    model = LGBMClassifier(n_estimators=300, learning_rate=0.05, random_state=42)
    cv_auc = cross_val_score(model, training_df.drop("is_fraud", axis=1),
                              training_df["is_fraud"], cv=5, scoring="roc_auc")

    mlflow.log_params(model.get_params())
    mlflow.log_metric("cv_auc_mean", cv_auc.mean())
    mlflow.log_metric("cv_auc_std", cv_auc.std())

    model.fit(training_df.drop("is_fraud", axis=1), training_df["is_fraud"])

    # Логирование вместе с Feature Store для автоматического lookup
    fs.log_model(
        model=model,
        artifact_path="model",
        flavor=mlflow.lightgbm,
        training_set=training_set,
        registered_model_name="fraud_detection_model"
    )

print(f"Run ID: {run.info.run_id}")

Инференс через Model Serving

# Включение Model Serving через API
import requests

def deploy_model(model_name: str, model_version: int, workspace_url: str, token: str):
    """Деплой модели в Databricks Model Serving"""
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

    # Создание endpoint
    endpoint_config = {
        "name": f"{model_name}_endpoint",
        "config": {
            "served_entities": [{
                "name": "primary",
                "entity_name": model_name,
                "entity_version": str(model_version),
                "workload_size": "Small",  # Small / Medium / Large
                "scale_to_zero_enabled": True
            }],
            "traffic_config": {
                "routes": [{"served_model_name": "primary", "traffic_percentage": 100}]
            }
        }
    }

    response = requests.post(
        f"{workspace_url}/api/2.0/serving-endpoints",
        headers=headers,
        json=endpoint_config
    )
    return response.json()

def batch_inference_job(model_name: str, input_table: str, output_table: str):
    """Запуск batch inference через Databricks Job"""
    # Scoring batch через Feature Store
    predictions = fs.score_batch(
        f"models:/{model_name}/Production",
        spark.table(input_table)
    )
    predictions.write.mode("overwrite").saveAsTable(output_table)

Сравнение с self-managed Spark

Аспект Databricks Self-managed Spark
Setup time 30 минут 1-2 недели
Cluster autoscaling Авто Ручная конфигурация
MLflow Встроен Отдельная установка
Delta Lake Нативно Отдельная конфигурация
Feature Store Встроен Feast / Tecton
Стоимость +20-30% к EC2 EC2 стоимость
GPU поддержка Нативно NVIDIA plugin

Оптимальный выбор Databricks: команды > 5 ML-инженеров, > 3 активных проектов, облачный деплой (AWS/Azure/GCP). ROI: экономия 2-4 месяцев разработки инфраструктуры на старте.