Разработка AI-системы автоскейлинга на основе нагрузки
Predictive autoscaling для AI-сервисов — масштабирование ресурсов на основе прогноза нагрузки, а не реактивного ответа на текущие метрики. Решает основную проблему реактивного скейлинга: к моменту добавления ресурсов нагрузка уже привела к деградации.
Проблемы реактивного autoscaling для LLM
- Cold start: запуск нового GPU pod с загрузкой модели занимает 3–10 минут
- Latency cliff: при перегрузке очередь растёт экспоненциально, качество деградирует резко
- Cost spike: реактивный скейл часто приводит к overprovision — ресурсы выделяются, но нагрузка уже спала
Predictive scaling решает это: видим приближающийся пик за 15–30 минут → запускаем ресурсы заранее.
Прогнозирование нагрузки
from prophet import Prophet
import pandas as pd
import numpy as np
class LoadForecaster:
def __init__(self):
self.model = None
self.last_trained = None
def train(self, historical_load: pd.DataFrame):
"""
historical_load: DataFrame с колонками 'ds' (datetime) и 'y' (requests_per_minute)
"""
self.model = Prophet(
seasonality_mode="multiplicative",
weekly_seasonality=True,
daily_seasonality=True,
changepoint_prior_scale=0.05 # сглаживание резких изменений
)
# Добавляем кастомные события (праздники, планируемые маркетинг-кампании)
self.model.add_country_holidays(country_name="RU")
self.model.fit(historical_load)
self.last_trained = datetime.utcnow()
def forecast(self, horizon_minutes: int = 60) -> pd.DataFrame:
"""Прогноз нагрузки на horizon_minutes вперёд."""
future = self.model.make_future_dataframe(
periods=horizon_minutes, freq="T" # поминутно
)
forecast = self.model.predict(future)
return forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail(horizon_minutes)
def get_required_replicas(self, forecast: pd.DataFrame, capacity_per_replica: float) -> int:
peak_load = forecast["yhat_upper"].max() # берём верхнюю границу (conservative)
return max(1, math.ceil(peak_load / capacity_per_replica))
Принятие решений о скейлинге
class PredictiveScalingController:
def __init__(
self,
forecaster: LoadForecaster,
lead_time_minutes: int = 15, # заранее до ожидаемого пика
scale_up_buffer: float = 1.2, # +20% запас
scale_down_delay_minutes: int = 30
):
self.forecaster = forecaster
self.lead_time = lead_time_minutes
self.buffer = scale_up_buffer
self.scale_down_delay = scale_down_delay_minutes
def get_scaling_decision(
self,
current_replicas: int,
current_load: float
) -> ScalingDecision:
# Прогноз на следующие 30 минут
forecast = self.forecaster.forecast(horizon_minutes=30)
peak_in_lead_time = forecast.head(self.lead_time)["yhat_upper"].max()
required = math.ceil(peak_in_lead_time * self.buffer / CAPACITY_PER_REPLICA)
# Решение
if required > current_replicas:
return ScalingDecision(
action="scale_up",
target_replicas=required,
reason=f"Predictive: peak {peak_in_lead_time:.0f} req/min in {self.lead_time}min"
)
elif required < current_replicas - 1:
# Scale down только если нагрузка снижается стабильно
recent_trend = self._is_load_decreasing(minutes=self.scale_down_delay)
if recent_trend:
return ScalingDecision(
action="scale_down",
target_replicas=max(1, required),
reason="Load decreasing trend confirmed"
)
return ScalingDecision(action="no_change", target_replicas=current_replicas)
Интеграция с Kubernetes
from kubernetes import client, config
class K8sScaler:
def __init__(self):
config.load_incluster_config()
self.apps_v1 = client.AppsV1Api()
def scale(self, namespace: str, deployment: str, replicas: int):
body = {"spec": {"replicas": replicas}}
self.apps_v1.patch_namespaced_deployment_scale(
name=deployment,
namespace=namespace,
body=body
)
logger.info(f"Scaled {namespace}/{deployment} to {replicas} replicas")
def get_current_replicas(self, namespace: str, deployment: str) -> int:
deployment_obj = self.apps_v1.read_namespaced_deployment(deployment, namespace)
return deployment_obj.spec.replicas
Обучение на исторических данных
class ContinuousLearner:
def update_model(self):
"""Переобучаем модель на новых данных каждые 24 часа."""
historical = self.metrics_db.get_load_history(days=90)
df = pd.DataFrame(historical, columns=["ds", "y"])
self.forecaster.train(df)
logger.info(f"Model retrained on {len(df)} data points")
# Оценка точности прогноза
accuracy = self.evaluate_forecast_accuracy()
if accuracy.mape > 0.20: # > 20% ошибка → алерт
logger.warning(f"Forecast accuracy degraded: MAPE={accuracy.mape:.1%}")
Сроки внедрения
Неделя 1–2: Сбор исторических метрик нагрузки, первая Prophet модель, backtesting
Неделя 3–4: Интеграция с K8s Deployment, shadow mode (прогнозируем но не скейлим)
Месяц 2: Перевод в production режим, мониторинг cost savings, continuous learning
Месяц 3: Тюнинг параметров, multi-service координация, circuit breakers для аномальных прогнозов







