Разработка модели обнаружения wash trading

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1 услугВсе 1306 услуг
Разработка модели обнаружения wash trading
Сложная
от 2 недель до 3 месяцев
Часто задаваемые вопросы
Направления блокчейн-разработки
Этапы блокчейн-разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1221
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1163
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    855
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1056
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    828

Разработка модели обнаружения wash trading

Wash trading — искусственное создание объёма торгов путём одновременной покупки и продажи одного актива одним лицом или группой координированных лиц. По данным Chainalysis, в 2023 году более 50% объёма торгов на ряде NFT маркетплейсов был wash trading. На централизованных биржах масштаб ещё больше. Модель обнаружения должна работать с on-chain данными, выявлять паттерны и выдавать интерпретируемые результаты.

Типы wash trading в Web3

Понимание разновидностей определяет выбор признаков модели:

Self-trading: один и тот же кошелёк покупает и продаёт сам себе или через цепочку аффилированных адресов.

Circular trading: A продаёт B, B продаёт C, C продаёт A. Актив возвращается к исходному владельцу.

Layered wash trading: сложные цепочки через 5-10 адресов для скрытия связей. Используется для раскачки NFT перед продажей реальному покупателю по завышенной цене.

Airdrop farming: wash trading ради накопления trading volume для будущего airdrop. Именно это было массовым на Blur в 2023 году.

Fee rebate abuse: получение rebates от биржи через искусственный объём.

Граф-анализ: основной инструмент

Ключевой метод обнаружения — построение transaction graph и поиск циклов и кластеров:

Построение графа транзакций

import networkx as nx
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Set, Tuple
import pandas as pd

@dataclass
class Transfer:
    tx_hash: str
    from_address: str
    to_address: str
    token_id: int  # для NFT
    price: float
    timestamp: int
    block_number: int

def build_transaction_graph(transfers: List[Transfer]) -> nx.DiGraph:
    """
    Строим направленный граф: узлы = адреса, рёбра = трансферы.
    Вес ребра = объём торгов между адресами.
    """
    G = nx.DiGraph()

    for t in transfers:
        if G.has_edge(t.from_address, t.to_address):
            G[t.from_address][t.to_address]['volume'] += t.price
            G[t.from_address][t.to_address]['count'] += 1
            G[t.from_address][t.to_address]['txs'].append(t.tx_hash)
        else:
            G.add_edge(
                t.from_address,
                t.to_address,
                volume=t.price,
                count=1,
                txs=[t.tx_hash]
            )

    return G

def detect_cycles(G: nx.DiGraph, max_length: int = 6) -> List[List[str]]:
    """
    Находим циклы в графе — признак wash trading.
    max_length ограничивает глубину поиска для производительности.
    """
    cycles = []

    # simple_cycles из NetworkX — алгоритм Джонсона, O((n+e)(c+1))
    for cycle in nx.simple_cycles(G):
        if len(cycle) <= max_length:
            cycles.append(cycle)

    return cycles

Кластеризация аффилированных адресов

Адреса из одного кластера (управляемые одним лицом) можно выявить через анализ:

  • Одинаковый funding source (получили ETH с одного адреса)
  • Паттерны синхронизации активности по времени
  • Общие gas price стратегии
def cluster_addresses(
    addresses: List[str],
    funding_map: Dict[str, str],  # address -> funding source
    time_correlations: Dict[Tuple[str, str], float]
) -> List[Set[str]]:
    """
    Union-Find для объединения аффилированных адресов в кластеры.
    """
    parent = {addr: addr for addr in addresses}

    def find(x):
        if parent[x] != x:
            parent[x] = find(parent[x])
        return parent[x]

    def union(x, y):
        parent[find(x)] = find(y)

    # Объединяем адреса с одним источником финансирования
    funding_groups = defaultdict(list)
    for addr, source in funding_map.items():
        funding_groups[source].append(addr)

    for source, addrs in funding_groups.items():
        for i in range(1, len(addrs)):
            union(addrs[0], addrs[i])

    # Объединяем по высокой корреляции активности
    CORRELATION_THRESHOLD = 0.85
    for (addr1, addr2), corr in time_correlations.items():
        if corr >= CORRELATION_THRESHOLD:
            union(addr1, addr2)

    # Собираем кластеры
    clusters = defaultdict(set)
    for addr in addresses:
        clusters[find(addr)].add(addr)

    return [cluster for cluster in clusters.values() if len(cluster) > 1]

Признаки для ML модели

Помимо граф-анализа строим feature vector для каждой торговой пары или адреса:

Временные признаки

def compute_temporal_features(
    trades: pd.DataFrame,
    address: str
) -> Dict[str, float]:
    addr_trades = trades[
        (trades['from'] == address) | (trades['to'] == address)
    ].sort_values('timestamp')

    features = {}

    # Средний интервал между сделками (в секундах)
    if len(addr_trades) > 1:
        intervals = addr_trades['timestamp'].diff().dropna()
        features['mean_trade_interval'] = intervals.mean()
        features['std_trade_interval'] = intervals.std()
        # Маленький STD = подозрительно регулярные интервалы
        features['regularity_score'] = 1 / (1 + features['std_trade_interval'])
    else:
        features['mean_trade_interval'] = 0
        features['std_trade_interval'] = 0
        features['regularity_score'] = 0

    # Доля сделок в нерабочее время (2-6 AM UTC)
    addr_trades['hour'] = pd.to_datetime(
        addr_trades['timestamp'], unit='s'
    ).dt.hour
    off_hours = addr_trades[addr_trades['hour'].between(2, 6)]
    features['off_hours_ratio'] = len(off_hours) / max(len(addr_trades), 1)

    return features

Экономические признаки

def compute_economic_features(
    trades: pd.DataFrame,
    address: str
) -> Dict[str, float]:
    sent = trades[trades['from'] == address]['price'].sum()
    received = trades[trades['to'] == address]['price'].sum()

    features = {}

    # Чистый P&L: wash trader обычно имеет P&L близкий к 0 (только gas)
    features['net_pnl'] = received - sent
    features['total_volume'] = sent + received
    features['pnl_to_volume_ratio'] = abs(features['net_pnl']) / max(features['total_volume'], 1)
    # Низкое значение = подозрительно (нет реального profit/loss)

    # Количество уникальных контрагентов
    counterparts = set(trades[trades['from'] == address]['to'].tolist() +
                       trades[trades['to'] == address]['from'].tolist())
    features['unique_counterparts'] = len(counterparts)

    # Концентрация объёма с одним контрагентом
    if len(counterparts) > 0:
        volumes_by_counterpart = trades.groupby('to')['price'].sum()
        max_concentration = volumes_by_counterpart.max() / max(sent, 1)
        features['max_counterpart_concentration'] = max_concentration

    return features

NFT-специфичные признаки

def compute_nft_features(
    trades: pd.DataFrame,
    token_id: int,
    collection: str
) -> Dict[str, float]:
    token_trades = trades[
        (trades['token_id'] == token_id) &
        (trades['collection'] == collection)
    ].sort_values('timestamp')

    features = {}

    # Количество смен владельца
    features['ownership_changes'] = len(token_trades)

    # Возврат к предыдущим владельцам
    owners_seen = set()
    revisits = 0
    for _, row in token_trades.iterrows():
        if row['to'] in owners_seen:
            revisits += 1
        owners_seen.add(row['to'])
    features['ownership_revisit_rate'] = revisits / max(len(token_trades), 1)

    # Рост цены vs market benchmark
    if len(token_trades) >= 2:
        price_growth = (token_trades.iloc[-1]['price'] /
                        token_trades.iloc[0]['price'] - 1)
        features['price_growth'] = price_growth
        # Аномальный рост цены при высоком объёме = подозрительно
    else:
        features['price_growth'] = 0

    return features

Модель классификации

Собираем признаки и обучаем Gradient Boosting:

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve, roc_auc_score
import shap

def train_wash_trading_model(
    features_df: pd.DataFrame,
    labels: pd.Series  # 1 = wash trading, 0 = legitimate
):
    X_train, X_test, y_train, y_test = train_test_split(
        features_df, labels, test_size=0.2, stratify=labels
    )

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    model = GradientBoostingClassifier(
        n_estimators=200,
        max_depth=5,
        learning_rate=0.05,
        subsample=0.8,
        random_state=42
    )
    model.fit(X_train_scaled, y_train)

    # SHAP для интерпретируемости
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_test_scaled)

    # Precision-Recall важнее ROC для несбалансированных данных
    y_proba = model.predict_proba(X_test_scaled)[:, 1]
    auc = roc_auc_score(y_test, y_proba)
    print(f"ROC-AUC: {auc:.3f}")

    return model, scaler, explainer

Оценка уверенности и интерпретация

Модель выдаёт не бинарный результат, а score с объяснением:

@dataclass
class WashTradingAssessment:
    address: str
    wash_probability: float    # 0.0 - 1.0
    risk_level: str           # LOW / MEDIUM / HIGH / CRITICAL
    contributing_factors: List[str]  # SHAP-based объяснение
    flagged_transactions: List[str]  # конкретные подозрительные tx

def assess_address(
    address: str,
    model,
    scaler,
    explainer,
    features: Dict
) -> WashTradingAssessment:
    X = pd.DataFrame([features])
    X_scaled = scaler.transform(X)

    probability = model.predict_proba(X_scaled)[0][1]

    if probability < 0.3:
        risk_level = "LOW"
    elif probability < 0.6:
        risk_level = "MEDIUM"
    elif probability < 0.85:
        risk_level = "HIGH"
    else:
        risk_level = "CRITICAL"

    # SHAP объяснение — какие признаки повлияли больше всего
    shap_vals = explainer.shap_values(X_scaled)[0]
    top_factors = sorted(
        zip(X.columns, shap_vals),
        key=lambda x: abs(x[1]),
        reverse=True
    )[:5]

    contributing_factors = [
        f"{feat}: {'+' if val > 0 else '-'}{abs(val):.3f}"
        for feat, val in top_factors
    ]

    return WashTradingAssessment(
        address=address,
        wash_probability=probability,
        risk_level=risk_level,
        contributing_factors=contributing_factors,
        flagged_transactions=[]
    )

Источники данных

Источник Данные Обновление
The Graph On-chain события DEX/NFT Real-time
Dune Analytics Исторические данные, SQL-доступ Несколько минут
Transpose Transaction graph data Real-time API
Flipside Crypto On-chain analytics Ежедневно
Нативный indexer Собственные события Real-time

Для production модели на DEX — собственный indexer через WebSocket RPC обеспечивает наименьшую задержку и полный контроль над данными. Dune Analytics хорош для разработки и исследования, но слишком медленный для real-time мониторинга.

Модель не является абсолютным детектором. Высокое значение score — это сигнал для ручного анализа, не автоматический бан. Особенно важно это для NFT маркетплейсов, где маркет-мейкеры и автоматизированные трейдеры могут давать ложные срабатывания.