Разработка модели обнаружения wash trading
Wash trading — искусственное создание объёма торгов путём одновременной покупки и продажи одного актива одним лицом или группой координированных лиц. По данным Chainalysis, в 2023 году более 50% объёма торгов на ряде NFT маркетплейсов был wash trading. На централизованных биржах масштаб ещё больше. Модель обнаружения должна работать с on-chain данными, выявлять паттерны и выдавать интерпретируемые результаты.
Типы wash trading в Web3
Понимание разновидностей определяет выбор признаков модели:
Self-trading: один и тот же кошелёк покупает и продаёт сам себе или через цепочку аффилированных адресов.
Circular trading: A продаёт B, B продаёт C, C продаёт A. Актив возвращается к исходному владельцу.
Layered wash trading: сложные цепочки через 5-10 адресов для скрытия связей. Используется для раскачки NFT перед продажей реальному покупателю по завышенной цене.
Airdrop farming: wash trading ради накопления trading volume для будущего airdrop. Именно это было массовым на Blur в 2023 году.
Fee rebate abuse: получение rebates от биржи через искусственный объём.
Граф-анализ: основной инструмент
Ключевой метод обнаружения — построение transaction graph и поиск циклов и кластеров:
Построение графа транзакций
import networkx as nx
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Set, Tuple
import pandas as pd
@dataclass
class Transfer:
tx_hash: str
from_address: str
to_address: str
token_id: int # для NFT
price: float
timestamp: int
block_number: int
def build_transaction_graph(transfers: List[Transfer]) -> nx.DiGraph:
"""
Строим направленный граф: узлы = адреса, рёбра = трансферы.
Вес ребра = объём торгов между адресами.
"""
G = nx.DiGraph()
for t in transfers:
if G.has_edge(t.from_address, t.to_address):
G[t.from_address][t.to_address]['volume'] += t.price
G[t.from_address][t.to_address]['count'] += 1
G[t.from_address][t.to_address]['txs'].append(t.tx_hash)
else:
G.add_edge(
t.from_address,
t.to_address,
volume=t.price,
count=1,
txs=[t.tx_hash]
)
return G
def detect_cycles(G: nx.DiGraph, max_length: int = 6) -> List[List[str]]:
"""
Находим циклы в графе — признак wash trading.
max_length ограничивает глубину поиска для производительности.
"""
cycles = []
# simple_cycles из NetworkX — алгоритм Джонсона, O((n+e)(c+1))
for cycle in nx.simple_cycles(G):
if len(cycle) <= max_length:
cycles.append(cycle)
return cycles
Кластеризация аффилированных адресов
Адреса из одного кластера (управляемые одним лицом) можно выявить через анализ:
- Одинаковый funding source (получили ETH с одного адреса)
- Паттерны синхронизации активности по времени
- Общие gas price стратегии
def cluster_addresses(
addresses: List[str],
funding_map: Dict[str, str], # address -> funding source
time_correlations: Dict[Tuple[str, str], float]
) -> List[Set[str]]:
"""
Union-Find для объединения аффилированных адресов в кластеры.
"""
parent = {addr: addr for addr in addresses}
def find(x):
if parent[x] != x:
parent[x] = find(parent[x])
return parent[x]
def union(x, y):
parent[find(x)] = find(y)
# Объединяем адреса с одним источником финансирования
funding_groups = defaultdict(list)
for addr, source in funding_map.items():
funding_groups[source].append(addr)
for source, addrs in funding_groups.items():
for i in range(1, len(addrs)):
union(addrs[0], addrs[i])
# Объединяем по высокой корреляции активности
CORRELATION_THRESHOLD = 0.85
for (addr1, addr2), corr in time_correlations.items():
if corr >= CORRELATION_THRESHOLD:
union(addr1, addr2)
# Собираем кластеры
clusters = defaultdict(set)
for addr in addresses:
clusters[find(addr)].add(addr)
return [cluster for cluster in clusters.values() if len(cluster) > 1]
Признаки для ML модели
Помимо граф-анализа строим feature vector для каждой торговой пары или адреса:
Временные признаки
def compute_temporal_features(
trades: pd.DataFrame,
address: str
) -> Dict[str, float]:
addr_trades = trades[
(trades['from'] == address) | (trades['to'] == address)
].sort_values('timestamp')
features = {}
# Средний интервал между сделками (в секундах)
if len(addr_trades) > 1:
intervals = addr_trades['timestamp'].diff().dropna()
features['mean_trade_interval'] = intervals.mean()
features['std_trade_interval'] = intervals.std()
# Маленький STD = подозрительно регулярные интервалы
features['regularity_score'] = 1 / (1 + features['std_trade_interval'])
else:
features['mean_trade_interval'] = 0
features['std_trade_interval'] = 0
features['regularity_score'] = 0
# Доля сделок в нерабочее время (2-6 AM UTC)
addr_trades['hour'] = pd.to_datetime(
addr_trades['timestamp'], unit='s'
).dt.hour
off_hours = addr_trades[addr_trades['hour'].between(2, 6)]
features['off_hours_ratio'] = len(off_hours) / max(len(addr_trades), 1)
return features
Экономические признаки
def compute_economic_features(
trades: pd.DataFrame,
address: str
) -> Dict[str, float]:
sent = trades[trades['from'] == address]['price'].sum()
received = trades[trades['to'] == address]['price'].sum()
features = {}
# Чистый P&L: wash trader обычно имеет P&L близкий к 0 (только gas)
features['net_pnl'] = received - sent
features['total_volume'] = sent + received
features['pnl_to_volume_ratio'] = abs(features['net_pnl']) / max(features['total_volume'], 1)
# Низкое значение = подозрительно (нет реального profit/loss)
# Количество уникальных контрагентов
counterparts = set(trades[trades['from'] == address]['to'].tolist() +
trades[trades['to'] == address]['from'].tolist())
features['unique_counterparts'] = len(counterparts)
# Концентрация объёма с одним контрагентом
if len(counterparts) > 0:
volumes_by_counterpart = trades.groupby('to')['price'].sum()
max_concentration = volumes_by_counterpart.max() / max(sent, 1)
features['max_counterpart_concentration'] = max_concentration
return features
NFT-специфичные признаки
def compute_nft_features(
trades: pd.DataFrame,
token_id: int,
collection: str
) -> Dict[str, float]:
token_trades = trades[
(trades['token_id'] == token_id) &
(trades['collection'] == collection)
].sort_values('timestamp')
features = {}
# Количество смен владельца
features['ownership_changes'] = len(token_trades)
# Возврат к предыдущим владельцам
owners_seen = set()
revisits = 0
for _, row in token_trades.iterrows():
if row['to'] in owners_seen:
revisits += 1
owners_seen.add(row['to'])
features['ownership_revisit_rate'] = revisits / max(len(token_trades), 1)
# Рост цены vs market benchmark
if len(token_trades) >= 2:
price_growth = (token_trades.iloc[-1]['price'] /
token_trades.iloc[0]['price'] - 1)
features['price_growth'] = price_growth
# Аномальный рост цены при высоком объёме = подозрительно
else:
features['price_growth'] = 0
return features
Модель классификации
Собираем признаки и обучаем Gradient Boosting:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve, roc_auc_score
import shap
def train_wash_trading_model(
features_df: pd.DataFrame,
labels: pd.Series # 1 = wash trading, 0 = legitimate
):
X_train, X_test, y_train, y_test = train_test_split(
features_df, labels, test_size=0.2, stratify=labels
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=5,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
model.fit(X_train_scaled, y_train)
# SHAP для интерпретируемости
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_scaled)
# Precision-Recall важнее ROC для несбалансированных данных
y_proba = model.predict_proba(X_test_scaled)[:, 1]
auc = roc_auc_score(y_test, y_proba)
print(f"ROC-AUC: {auc:.3f}")
return model, scaler, explainer
Оценка уверенности и интерпретация
Модель выдаёт не бинарный результат, а score с объяснением:
@dataclass
class WashTradingAssessment:
address: str
wash_probability: float # 0.0 - 1.0
risk_level: str # LOW / MEDIUM / HIGH / CRITICAL
contributing_factors: List[str] # SHAP-based объяснение
flagged_transactions: List[str] # конкретные подозрительные tx
def assess_address(
address: str,
model,
scaler,
explainer,
features: Dict
) -> WashTradingAssessment:
X = pd.DataFrame([features])
X_scaled = scaler.transform(X)
probability = model.predict_proba(X_scaled)[0][1]
if probability < 0.3:
risk_level = "LOW"
elif probability < 0.6:
risk_level = "MEDIUM"
elif probability < 0.85:
risk_level = "HIGH"
else:
risk_level = "CRITICAL"
# SHAP объяснение — какие признаки повлияли больше всего
shap_vals = explainer.shap_values(X_scaled)[0]
top_factors = sorted(
zip(X.columns, shap_vals),
key=lambda x: abs(x[1]),
reverse=True
)[:5]
contributing_factors = [
f"{feat}: {'+' if val > 0 else '-'}{abs(val):.3f}"
for feat, val in top_factors
]
return WashTradingAssessment(
address=address,
wash_probability=probability,
risk_level=risk_level,
contributing_factors=contributing_factors,
flagged_transactions=[]
)
Источники данных
| Источник | Данные | Обновление |
|---|---|---|
| The Graph | On-chain события DEX/NFT | Real-time |
| Dune Analytics | Исторические данные, SQL-доступ | Несколько минут |
| Transpose | Transaction graph data | Real-time API |
| Flipside Crypto | On-chain analytics | Ежедневно |
| Нативный indexer | Собственные события | Real-time |
Для production модели на DEX — собственный indexer через WebSocket RPC обеспечивает наименьшую задержку и полный контроль над данными. Dune Analytics хорош для разработки и исследования, но слишком медленный для real-time мониторинга.
Модель не является абсолютным детектором. Высокое значение score — это сигнал для ручного анализа, не автоматический бан. Особенно важно это для NFT маркетплейсов, где маркет-мейкеры и автоматизированные трейдеры могут давать ложные срабатывания.







