AI-система programmatic-рекламы
Programmatic — автоматическая покупка рекламных показов через аукционы в реальном времени. Каждый показ баннера — это аукцион, занимающий 100 миллисекунд от запроса до победы и отображения. AI решает: участвовать ли в этом аукционе, и если да — по какой цене.
Архитектура programmatic экосистемы
Publisher → SSP → Ad Exchange → DSP → Advertiser
↕ RTB (100ms)
Bid Request/Response
Полный стек: Supply Side Platform (SSP) управляет инвентарём издателя. Demand Side Platform (DSP) управляет закупкой для рекламодателя. Ad Exchange — биржа, соединяющая стороны.
Модель предсказания ставки (Bid Prediction)
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.calibration import CalibratedClassifierCV
import lightgbm as lgb
import json
class BidRequestFeaturizer:
"""Извлечение признаков из bid request за < 5ms"""
def featurize(self, bid_request: dict) -> np.ndarray:
"""
bid_request: стандартный OpenRTB 2.5 объект
Возвращает признаковый вектор для модели за < 1ms
"""
return np.array([
# Пользователь
self._hash_encode(bid_request.get('user', {}).get('id', ''), 100),
bid_request.get('user', {}).get('yob', 1990),
int(bid_request.get('user', {}).get('gender') == 'M'),
len(bid_request.get('user', {}).get('segments', [])),
# Устройство
self._device_type_encode(bid_request.get('device', {}).get('devicetype')),
int(bid_request.get('device', {}).get('os', '') in ['iOS', 'Android']),
self._hash_encode(bid_request.get('device', {}).get('model', ''), 50),
# Контекст показа
bid_request.get('imp', [{}])[0].get('banner', {}).get('w', 300),
bid_request.get('imp', [{}])[0].get('banner', {}).get('h', 250),
int(bid_request.get('imp', [{}])[0].get('instl') == 1), # Interstitial
# Площадка
self._hash_encode(bid_request.get('site', {}).get('domain', ''), 200),
self._hash_encode(bid_request.get('site', {}).get('cat', ['IAB1'])[0], 20),
# Временной контекст
pd.Timestamp.now().hour,
pd.Timestamp.now().weekday(),
int(pd.Timestamp.now().weekday() >= 5),
# Floor price
bid_request.get('imp', [{}])[0].get('bidfloor', 0),
], dtype=np.float32)
def _hash_encode(self, value: str, n_buckets: int) -> int:
return hash(value) % n_buckets
def _device_type_encode(self, device_type) -> int:
mapping = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5}
return mapping.get(device_type, 0)
class CTRPredictor:
"""
Предсказание CTR (Click-Through Rate) для bid.
LightGBM обычно лучше нейросетей для tabular bid data.
"""
def __init__(self):
self.model = lgb.LGBMClassifier(
n_estimators=500,
learning_rate=0.05,
num_leaves=127,
min_child_samples=50,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
n_jobs=-1
)
def train(self, X: np.ndarray, y: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray):
"""Обучение с ранней остановкой"""
self.model.fit(
X, y,
eval_set=[(X_val, y_val)],
eval_metric='auc',
callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
)
def predict_ctr(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)[:, 1]
class ConversionRatePredictor:
"""CVR: вероятность конверсии при клике"""
def __init__(self):
# CVR обычно более разреженные данные → меньше деревьев
self.model = lgb.LGBMClassifier(
n_estimators=200,
learning_rate=0.05,
num_leaves=63,
min_child_samples=100,
random_state=42
)
def predict_cvr(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)[:, 1]
class BiddingEngine:
"""Движок принятия решений о ставках"""
def __init__(self, ctr_model: CTRPredictor,
cvr_model: ConversionRatePredictor,
featurizer: BidRequestFeaturizer):
self.ctr_model = ctr_model
self.cvr_model = cvr_model
self.featurizer = featurizer
def compute_bid(self, bid_request: dict,
campaign_config: dict) -> dict:
"""
Вычисление оптимальной ставки.
За < 10ms (latency constraint RTB).
"""
features = self.featurizer.featurize(bid_request)
# Предсказание CTR и CVR
ctr = float(self.ctr_model.predict_ctr(features.reshape(1, -1))[0])
cvr = float(self.cvr_model.predict_cvr(features.reshape(1, -1))[0])
# pCTCVR = P(click) × P(conversion|click)
pctcvr = ctr * cvr
# Ожидаемая ценность = pCTCVR × ценность конверсии
target_cpa = campaign_config.get('target_cpa_usd', 10)
expected_value = pctcvr * target_cpa
# Добавляем budget pacing adjustment
pacing_factor = self._compute_pacing_factor(campaign_config)
bid_price = expected_value * pacing_factor
# Ограничения
floor_price = bid_request.get('imp', [{}])[0].get('bidfloor', 0)
max_bid = campaign_config.get('max_bid_cpm', 10)
if bid_price < floor_price:
return {'bid': 0, 'reason': 'below_floor', 'predicted_ctr': ctr}
final_bid = min(bid_price, max_bid)
return {
'bid': round(final_bid, 4),
'predicted_ctr': round(ctr, 5),
'predicted_cvr': round(cvr, 5),
'predicted_pctcvr': round(pctcvr, 6),
'pacing_factor': round(pacing_factor, 3),
'auction_win_probability': self._estimate_win_prob(final_bid, floor_price)
}
def _compute_pacing_factor(self, campaign: dict) -> float:
"""
Budget pacing: корректируем ставки, чтобы равномерно расходовать бюджет.
Если тратим слишком быстро → снижаем, слишком медленно → повышаем.
"""
budget_total = campaign.get('daily_budget_usd', 1000)
spent_today = campaign.get('spent_today_usd', 0)
hours_elapsed = campaign.get('hours_elapsed_today', 12)
total_hours = 24
expected_spent_ratio = hours_elapsed / total_hours
actual_spent_ratio = spent_today / max(budget_total, 1)
if actual_spent_ratio > expected_spent_ratio * 1.1:
return 0.8 # Тратим слишком быстро — снижаем ставки
elif actual_spent_ratio < expected_spent_ratio * 0.9:
return 1.2 # Тратим слишком медленно — повышаем
return 1.0
def _estimate_win_prob(self, bid: float, floor: float) -> float:
"""Упрощённая оценка вероятности победы в аукционе"""
if bid < floor:
return 0.0
margin = (bid - floor) / max(floor, 0.01)
return min(0.95, 0.3 + margin * 0.5)
class BudgetPacingController:
"""Управление равномерностью расходования бюджета"""
def throttle_bid_rate(self, campaign_stats: dict,
current_qps: float) -> float:
"""
Throttling: сколько % bid requests обрабатываем.
Если тратим слишком быстро — часть запросов игнорируем.
"""
budget = campaign_stats.get('daily_budget', 1000)
spent = campaign_stats.get('spent', 0)
hours = campaign_stats.get('hours_elapsed', 12)
target_spend_rate = budget / 24 # Равномерный расход
actual_spend_rate = spent / max(hours, 0.1)
if actual_spend_rate > target_spend_rate * 1.2:
throttle = target_spend_rate / actual_spend_rate
return float(np.clip(throttle, 0.1, 1.0))
return 1.0
def compute_optimal_frequency_cap(self, user_stats: dict,
campaign_config: dict) -> dict:
"""Ограничение частоты показов одному пользователю"""
base_cap = campaign_config.get('frequency_cap', {'hour': 2, 'day': 5, 'week': 15})
# Если пользователь уже кликал → снижаем частоту (не надо давить)
if user_stats.get('has_clicked'):
return {'hour': 1, 'day': 2, 'week': 5}
# Если пользователь видел много показов без клика → снижаем
impressions_without_click = user_stats.get('impressions_no_click', 0)
if impressions_without_click > 20:
return {'hour': 0, 'day': 1, 'week': 3}
return base_cap
Аукционная механика и оптимизация
class AuctionOptimizer:
"""Оптимизация стратегии в аукционе первой и второй цены"""
def optimal_bid_second_price(self, valuation: float,
bid_landscape: np.ndarray) -> float:
"""
В аукционе второй цены (Vickrey) оптимально ставить свою истинную ценность.
Bid shading для first-price auctions: bid < valuation.
"""
return valuation # Second price: truthful bidding is optimal
def bid_shading_first_price(self, valuation: float,
historical_clearing_prices: np.ndarray) -> float:
"""
Bid shading для аукциона первой цены.
Оптимальная ставка < valuation, основанная на распределении побеждающих ставок.
"""
if len(historical_clearing_prices) == 0:
return valuation * 0.8 # Консервативное значение
# Оцениваем вероятность победы при разных ставках
best_bid = valuation * 0.5
best_profit = -float('inf')
for bid_pct in np.arange(0.5, 1.0, 0.05):
bid = valuation * bid_pct
win_prob = (historical_clearing_prices < bid).mean()
expected_profit = win_prob * (valuation - bid)
if expected_profit > best_profit:
best_profit = expected_profit
best_bid = bid
return round(best_bid, 4)
def evaluate_campaign_performance(self, impressions: pd.DataFrame) -> dict:
"""Сводные метрики кампании"""
return {
'impressions': len(impressions),
'clicks': impressions['clicked'].sum(),
'conversions': impressions['converted'].sum(),
'spend_usd': impressions['bid_price'].sum(),
'ctr': impressions['clicked'].mean(),
'cvr': impressions['converted'].sum() / max(impressions['clicked'].sum(), 1),
'cpa_usd': impressions['bid_price'].sum() / max(impressions['converted'].sum(), 1),
'roas': impressions.get('revenue', pd.Series([0])).sum() / max(impressions['bid_price'].sum(), 1),
'effective_cpm': impressions['bid_price'].mean() * 1000,
}
Latency требования и инфраструктура
RTB требует ответа за 100ms в большинстве бирж (Google, OpenX) и 50ms в Google Display Network. Это жёсткий SLA — любая задержка = потеря аукциона.
| Компонент | Latency budget |
|---|---|
| Сетевые задержки | ~20ms |
| Feature extraction | ~5ms |
| CTR/CVR предсказание | ~3ms |
| Bid price calculation | ~1ms |
| Ответ в биржу | ~1ms |
| Итого | ~30ms (запас) |
Для выдерживания latency: модели на ONNX Runtime (5-10x быстрее sklearn), feature serving через Redis (<1ms), горизонтальное масштабирование bid service на K8s с autoscaling по QPS.







