Обучение модели рекомендательной системы
Обучение рекомендательной модели — это не просто model.fit(). Нужно правильно сформировать обучающие данные (implicit feedback vs explicit ratings), выбрать отрицательные примеры, настроить функцию потерь и оценить на правильных метриках. Ошибки на каждом из этих этапов обнуляют точность даже хорошей архитектуры.
Формирование обучающей выборки
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from dataclasses import dataclass
@dataclass
class InteractionSample:
user_id: int
item_id: int
label: float # 0/1 или 0-5
weight: float # важность примера
class RecommendationDataset(Dataset):
"""Dataset с negative sampling для implicit feedback"""
def __init__(self, positive_interactions: pd.DataFrame,
all_item_ids: list,
n_negatives: int = 4,
negative_sampling: str = 'uniform'):
"""
positive_interactions: user_id, item_id, weight
n_negatives: отрицательных примеров на каждый позитивный
negative_sampling: 'uniform' | 'popularity' | 'hard'
"""
self.positives = positive_interactions
self.all_items = np.array(all_item_ids)
self.n_neg = n_negatives
self.sampling = negative_sampling
# Построение индекса для быстрой проверки
self.user_items = (
positive_interactions.groupby('user_id')['item_id']
.apply(set).to_dict()
)
# Для popularity-based sampling
if negative_sampling == 'popularity':
item_counts = positive_interactions['item_id'].value_counts()
total = item_counts.sum()
# Вероятность пропорциональна корню популярности
probs = np.sqrt(item_counts.reindex(all_item_ids, fill_value=1).values)
self.item_probs = probs / probs.sum()
def __len__(self):
return len(self.positives)
def __getitem__(self, idx):
row = self.positives.iloc[idx]
user_id = row['user_id']
pos_item = row['item_id']
weight = row.get('weight', 1.0)
# Positive sample
samples = [{'user': user_id, 'item': pos_item, 'label': 1.0, 'weight': weight}]
# Negative samples
user_known_items = self.user_items.get(user_id, set())
negatives = []
attempts = 0
while len(negatives) < self.n_neg and attempts < self.n_neg * 10:
if self.sampling == 'popularity':
neg_item = np.random.choice(self.all_items, p=self.item_probs)
else:
neg_item = np.random.choice(self.all_items)
if neg_item not in user_known_items:
negatives.append(neg_item)
attempts += 1
for neg_item in negatives:
samples.append({'user': user_id, 'item': neg_item, 'label': 0.0, 'weight': 1.0})
return samples
def collate_recommendation_batch(batch):
"""Collate function для DataLoader"""
flat_samples = [s for sublist in batch for s in sublist]
return {
'user_ids': torch.tensor([s['user'] for s in flat_samples], dtype=torch.long),
'item_ids': torch.tensor([s['item'] for s in flat_samples], dtype=torch.long),
'labels': torch.tensor([s['label'] for s in flat_samples], dtype=torch.float32),
'weights': torch.tensor([s['weight'] for s in flat_samples], dtype=torch.float32)
}
Временное разделение выборки
def temporal_train_val_test_split(interactions: pd.DataFrame,
val_days: int = 7,
test_days: int = 7) -> tuple:
"""
Правильное разделение для рекомендательных систем:
Не random split! Обучаем на прошлом, тестируем на будущем.
"""
interactions['timestamp'] = pd.to_datetime(interactions['timestamp'])
max_date = interactions['timestamp'].max()
test_start = max_date - pd.Timedelta(days=test_days)
val_start = test_start - pd.Timedelta(days=val_days)
train = interactions[interactions['timestamp'] < val_start]
val = interactions[
(interactions['timestamp'] >= val_start) &
(interactions['timestamp'] < test_start)
]
test = interactions[interactions['timestamp'] >= test_start]
# Только пользователи из обучающей выборки
train_users = set(train['user_id'])
val = val[val['user_id'].isin(train_users)]
test = test[test['user_id'].isin(train_users)]
print(f"Train: {len(train):,} interactions, {train['user_id'].nunique():,} users")
print(f"Val: {len(val):,} interactions, {val['user_id'].nunique():,} users")
print(f"Test: {len(test):,} interactions, {test['user_id'].nunique():,} users")
return train, val, test
Полный цикл обучения с early stopping
class RecommenderTrainingPipeline:
def __init__(self, model, optimizer, loss_fn):
self.model = model
self.optimizer = optimizer
self.loss_fn = loss_fn
self.best_metric = 0
self.patience_counter = 0
def train(self, train_loader: DataLoader,
val_loader: DataLoader,
epochs: int = 20,
patience: int = 5,
eval_metric: str = 'ndcg@10') -> dict:
history = {'train_loss': [], 'val_metric': []}
for epoch in range(epochs):
# Training
self.model.train()
train_loss = 0
for batch in train_loader:
user_ids = batch['user_ids']
item_ids = batch['item_ids']
labels = batch['labels']
weights = batch['weights']
# Forward pass
scores = self.model(user_ids, item_ids)
# Weighted BCE loss
loss = (weights * self.loss_fn(scores, labels)).mean()
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
train_loss += loss.item()
# Validation
val_metric = self.evaluate(val_loader, metric=eval_metric)
history['train_loss'].append(train_loss / len(train_loader))
history['val_metric'].append(val_metric)
print(f"Epoch {epoch+1}/{epochs}: loss={train_loss/len(train_loader):.4f}, {eval_metric}={val_metric:.4f}")
# Early stopping
if val_metric > self.best_metric:
self.best_metric = val_metric
self.patience_counter = 0
torch.save(self.model.state_dict(), 'best_model.pt')
else:
self.patience_counter += 1
if self.patience_counter >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
# Загрузка лучшей модели
self.model.load_state_dict(torch.load('best_model.pt'))
return history
def evaluate(self, data_loader: DataLoader,
metric: str = 'ndcg@10',
k: int = 10) -> float:
"""Оценка на validation/test наборе"""
self.model.eval()
user_scores = {}
with torch.no_grad():
for batch in data_loader:
scores = self.model(batch['user_ids'], batch['item_ids'])
for u, i, s, l in zip(batch['user_ids'], batch['item_ids'], scores, batch['labels']):
uid = u.item()
if uid not in user_scores:
user_scores[uid] = {'pred': [], 'true': []}
user_scores[uid]['pred'].append(s.item())
user_scores[uid]['true'].append(l.item())
# Вычисление NDCG@K
ndcg_scores = []
for uid, data in user_scores.items():
pred = np.array(data['pred'])
true = np.array(data['true'])
ranked_indices = np.argsort(pred)[::-1][:k]
dcg = sum(
true[i] / np.log2(rank + 2)
for rank, i in enumerate(ranked_indices)
)
ideal_indices = np.argsort(true)[::-1][:k]
idcg = sum(
true[i] / np.log2(rank + 2)
for rank, i in enumerate(ideal_indices)
)
ndcg_scores.append(dcg / idcg if idcg > 0 else 0)
return np.mean(ndcg_scores)
Типичное расписание обучения
| Этап | Время | Примечание |
|---|---|---|
| Подготовка данных | 2-4 часа | Формирование пар, split |
| Базовая модель (ALS) | 15-30 мин | Хороший baseline |
| Two-tower обучение (CPU) | 4-8 часов | 10-20 эпох |
| Two-tower обучение (GPU A100) | 30-60 мин | То же |
| Hyperparameter tuning | 1-3 дня | Optuna, 50-100 trials |
| A/B тест в продакшне | 2-4 недели | Статистическая значимость |
Без GPU обучение двухбашенной модели на 5M взаимодействий занимает 12-24 часа. С A100 — 1-2 часа. Минимальный объём для успешного обучения: 50K уникальных пар user-item.







