Реализация рекомендательной системы товаров (Collaborative Filtering)
Collaborative Filtering — самый распространённый подход для товарных рекомендаций: "пользователи, похожие на тебя, покупали вот это". Не нужны описания товаров — только история взаимодействий. Работает для любых доменов, но требует достаточного объёма транзакций (>50K) и плохо справляется с холодным стартом.
ALS (Alternating Least Squares) матричная факторизация
import numpy as np
import scipy.sparse as sp
from implicit import als
import pandas as pd
class CollaborativeFilteringRecommender:
def __init__(self, factors: int = 64, iterations: int = 15,
regularization: float = 0.01):
self.model = als.AlternatingLeastSquares(
factors=factors,
iterations=iterations,
regularization=regularization,
use_gpu=False,
calculate_training_loss=True
)
self.user_map = {}
self.item_map = {}
self.reverse_item_map = {}
def fit(self, interactions_df: pd.DataFrame,
user_col: str = "user_id",
item_col: str = "item_id",
weight_col: str = "weight") -> None:
"""
interactions_df: user_id, item_id, weight (1=просмотр, 2=добавление в корзину, 5=покупка)
"""
# Энкодинг пользователей и товаров
unique_users = interactions_df[user_col].unique()
unique_items = interactions_df[item_col].unique()
self.user_map = {u: i for i, u in enumerate(unique_users)}
self.item_map = {it: i for i, it in enumerate(unique_items)}
self.reverse_item_map = {i: it for it, i in self.item_map.items()}
rows = interactions_df[item_col].map(self.item_map)
cols = interactions_df[user_col].map(self.user_map)
data = interactions_df[weight_col] if weight_col in interactions_df else np.ones(len(interactions_df))
# Sparse matrix: items × users
self.matrix = sp.csr_matrix(
(data, (rows, cols)),
shape=(len(unique_items), len(unique_users))
)
self.model.fit(self.matrix)
def recommend(self, user_id, n: int = 10,
exclude_purchased: bool = True) -> list[tuple]:
"""Top-N рекомендаций для пользователя"""
if user_id not in self.user_map:
return self._popular_items(n)
user_idx = self.user_map[user_id]
filter_items = None
if exclude_purchased:
# Индексы уже купленных товаров
user_items = self.matrix.T.getcol(user_idx)
filter_items = user_items.indices
item_indices, scores = self.model.recommend(
userid=user_idx,
user_items=self.matrix.T[user_idx],
N=n,
filter_already_liked_items=exclude_purchased
)
return [
(self.reverse_item_map[idx], float(score))
for idx, score in zip(item_indices, scores)
]
def similar_items(self, item_id, n: int = 10) -> list[tuple]:
"""Похожие товары (item2item)"""
if item_id not in self.item_map:
return []
item_idx = self.item_map[item_id]
similar_indices, scores = self.model.similar_items(item_idx, N=n + 1)
return [
(self.reverse_item_map[idx], float(score))
for idx, score in zip(similar_indices, scores)
if idx != item_idx
][:n]
def _popular_items(self, n: int) -> list[tuple]:
"""Fallback: популярные товары для новых пользователей"""
item_popularity = np.array(self.matrix.sum(axis=1)).flatten()
top_indices = np.argsort(item_popularity)[-n:][::-1]
return [
(self.reverse_item_map[idx], float(item_popularity[idx]))
for idx in top_indices
]
BPR (Bayesian Personalized Ranking) — для неявных данных
class BPRRecommender:
"""
BPR оптимизирует порядок, а не точность предсказания рейтинга.
Лучше подходит когда нет явных оценок — только clicks/views/purchases.
"""
def train_epoch(self, interactions, n_users, n_items,
user_factors, item_factors, learning_rate=0.01, reg=0.01):
"""SGD шаг обучения BPR"""
# Случайные триплеты (user, positive_item, negative_item)
user_idx = np.random.randint(n_users)
user_items = interactions[user_idx].indices
if len(user_items) == 0:
return 0
# Позитивный пример — то, с чем взаимодействовал
pos_item_idx = np.random.choice(user_items)
# Негативный пример — случайный товар без взаимодействия
neg_item_idx = np.random.randint(n_items)
while neg_item_idx in user_items:
neg_item_idx = np.random.randint(n_items)
# BPR loss: σ(u·i_pos - u·i_neg) → maximize
u = user_factors[user_idx]
i_pos = item_factors[pos_item_idx]
i_neg = item_factors[neg_item_idx]
diff = np.dot(u, i_pos - i_neg)
sigmoid = 1 / (1 + np.exp(-diff))
loss = -np.log(sigmoid + 1e-10)
# Градиентный шаг
grad = (1 - sigmoid)
user_factors[user_idx] += learning_rate * (grad * (i_pos - i_neg) - reg * u)
item_factors[pos_item_idx] += learning_rate * (grad * u - reg * i_pos)
item_factors[neg_item_idx] += learning_rate * (-grad * u - reg * i_neg)
return loss
Оценка качества
def evaluate_recommender(model, test_interactions: dict,
k_values: list = [5, 10, 20]) -> dict:
"""Precision@K, Recall@K, NDCG@K"""
metrics = {f"precision@{k}": [] for k in k_values}
metrics.update({f"recall@{k}": [] for k in k_values})
metrics.update({f"ndcg@{k}": [] for k in k_values})
for user_id, true_items in test_interactions.items():
if not true_items:
continue
recommendations = model.recommend(user_id, n=max(k_values))
rec_items = [item_id for item_id, _ in recommendations]
for k in k_values:
top_k = set(rec_items[:k])
true_set = set(true_items)
hits = len(top_k & true_set)
metrics[f"precision@{k}"].append(hits / k)
metrics[f"recall@{k}"].append(hits / len(true_set) if true_set else 0)
# NDCG@K
dcg = sum(
1 / np.log2(i + 2)
for i, item in enumerate(rec_items[:k])
if item in true_set
)
idcg = sum(1 / np.log2(i + 2) for i in range(min(k, len(true_set))))
metrics[f"ndcg@{k}"].append(dcg / idcg if idcg > 0 else 0)
return {k: np.mean(v) for k, v in metrics.items()}
ALS на 1M пользователей × 100K товаров обучается за 5-15 минут на CPU (8 потоков). NDCG@10 при достаточном объёме данных: 0.25-0.45. Ключевые гиперпараметры: factors=64-128, iterations=15-30, regularization=0.001-0.1. Взвешивание событий: просмотр=1, добавление в корзину=3, покупка=5, повторная покупка=8.







