Разработка AI-системы автоматического масштабирования приложений на основе прогноза нагрузки

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Разработка AI-системы автоматического масштабирования приложений на основе прогноза нагрузки
Сложная
~1-2 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    854
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Разработка AI-системы автоскейлинга на основе нагрузки

Predictive autoscaling для AI-сервисов — масштабирование ресурсов на основе прогноза нагрузки, а не реактивного ответа на текущие метрики. Решает основную проблему реактивного скейлинга: к моменту добавления ресурсов нагрузка уже привела к деградации.

Проблемы реактивного autoscaling для LLM

  • Cold start: запуск нового GPU pod с загрузкой модели занимает 3–10 минут
  • Latency cliff: при перегрузке очередь растёт экспоненциально, качество деградирует резко
  • Cost spike: реактивный скейл часто приводит к overprovision — ресурсы выделяются, но нагрузка уже спала

Predictive scaling решает это: видим приближающийся пик за 15–30 минут → запускаем ресурсы заранее.

Прогнозирование нагрузки

from prophet import Prophet
import pandas as pd
import numpy as np

class LoadForecaster:
    def __init__(self):
        self.model = None
        self.last_trained = None

    def train(self, historical_load: pd.DataFrame):
        """
        historical_load: DataFrame с колонками 'ds' (datetime) и 'y' (requests_per_minute)
        """
        self.model = Prophet(
            seasonality_mode="multiplicative",
            weekly_seasonality=True,
            daily_seasonality=True,
            changepoint_prior_scale=0.05  # сглаживание резких изменений
        )
        # Добавляем кастомные события (праздники, планируемые маркетинг-кампании)
        self.model.add_country_holidays(country_name="RU")
        self.model.fit(historical_load)
        self.last_trained = datetime.utcnow()

    def forecast(self, horizon_minutes: int = 60) -> pd.DataFrame:
        """Прогноз нагрузки на horizon_minutes вперёд."""
        future = self.model.make_future_dataframe(
            periods=horizon_minutes, freq="T"  # поминутно
        )
        forecast = self.model.predict(future)
        return forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail(horizon_minutes)

    def get_required_replicas(self, forecast: pd.DataFrame, capacity_per_replica: float) -> int:
        peak_load = forecast["yhat_upper"].max()  # берём верхнюю границу (conservative)
        return max(1, math.ceil(peak_load / capacity_per_replica))

Принятие решений о скейлинге

class PredictiveScalingController:
    def __init__(
        self,
        forecaster: LoadForecaster,
        lead_time_minutes: int = 15,   # заранее до ожидаемого пика
        scale_up_buffer: float = 1.2,  # +20% запас
        scale_down_delay_minutes: int = 30
    ):
        self.forecaster = forecaster
        self.lead_time = lead_time_minutes
        self.buffer = scale_up_buffer
        self.scale_down_delay = scale_down_delay_minutes

    def get_scaling_decision(
        self,
        current_replicas: int,
        current_load: float
    ) -> ScalingDecision:

        # Прогноз на следующие 30 минут
        forecast = self.forecaster.forecast(horizon_minutes=30)
        peak_in_lead_time = forecast.head(self.lead_time)["yhat_upper"].max()

        required = math.ceil(peak_in_lead_time * self.buffer / CAPACITY_PER_REPLICA)

        # Решение
        if required > current_replicas:
            return ScalingDecision(
                action="scale_up",
                target_replicas=required,
                reason=f"Predictive: peak {peak_in_lead_time:.0f} req/min in {self.lead_time}min"
            )
        elif required < current_replicas - 1:
            # Scale down только если нагрузка снижается стабильно
            recent_trend = self._is_load_decreasing(minutes=self.scale_down_delay)
            if recent_trend:
                return ScalingDecision(
                    action="scale_down",
                    target_replicas=max(1, required),
                    reason="Load decreasing trend confirmed"
                )

        return ScalingDecision(action="no_change", target_replicas=current_replicas)

Интеграция с Kubernetes

from kubernetes import client, config

class K8sScaler:
    def __init__(self):
        config.load_incluster_config()
        self.apps_v1 = client.AppsV1Api()

    def scale(self, namespace: str, deployment: str, replicas: int):
        body = {"spec": {"replicas": replicas}}
        self.apps_v1.patch_namespaced_deployment_scale(
            name=deployment,
            namespace=namespace,
            body=body
        )
        logger.info(f"Scaled {namespace}/{deployment} to {replicas} replicas")

    def get_current_replicas(self, namespace: str, deployment: str) -> int:
        deployment_obj = self.apps_v1.read_namespaced_deployment(deployment, namespace)
        return deployment_obj.spec.replicas

Обучение на исторических данных

class ContinuousLearner:
    def update_model(self):
        """Переобучаем модель на новых данных каждые 24 часа."""
        historical = self.metrics_db.get_load_history(days=90)
        df = pd.DataFrame(historical, columns=["ds", "y"])

        self.forecaster.train(df)
        logger.info(f"Model retrained on {len(df)} data points")

        # Оценка точности прогноза
        accuracy = self.evaluate_forecast_accuracy()
        if accuracy.mape > 0.20:  # > 20% ошибка → алерт
            logger.warning(f"Forecast accuracy degraded: MAPE={accuracy.mape:.1%}")

Сроки внедрения

Неделя 1–2: Сбор исторических метрик нагрузки, первая Prophet модель, backtesting

Неделя 3–4: Интеграция с K8s Deployment, shadow mode (прогнозируем но не скейлим)

Месяц 2: Перевод в production режим, мониторинг cost savings, continuous learning

Месяц 3: Тюнинг параметров, multi-service координация, circuit breakers для аномальных прогнозов