Разработка арбитражного бота
Арбитраж — это использование ценовых расхождений между рынками для получения прибыли без (теоретически) рыночного риска. На практике риск есть: execution risk, latency risk, funding risk. Разберём реальные арбитражные стратегии и их техническую реализацию.
Типы арбитражных стратегий
Простой арбитраж (межбиржевой)
Одинаковый актив торгуется на двух биржах по разной цене. BTC на Binance — $42,100, на OKX — $42,150. Покупаем на Binance, продаём на OKX, разница $50 — наша прибыль.
Главная проблема: к моменту исполнения обеих ног цены выровняются. Нужна максимально низкая latency и предварительно размещённые балансы на обеих биржах.
import asyncio
import aiohttp
from decimal import Decimal
class SimpleArbitrageBot:
def __init__(self):
self.binance = ccxt.binance({'apiKey': BINANCE_KEY, 'secret': BINANCE_SECRET})
self.okx = ccxt.okx({'apiKey': OKX_KEY, 'secret': OKX_SECRET})
self.min_profit_pct = Decimal('0.15') # минимум 0.15% после комиссий
async def check_opportunity(self, symbol: str) -> ArbitrageOpportunity | None:
# Параллельный запрос цен с обеих бирж
binance_ticker, okx_ticker = await asyncio.gather(
self.binance.fetch_ticker(symbol),
self.okx.fetch_ticker(symbol),
)
binance_bid = Decimal(str(binance_ticker['bid']))
binance_ask = Decimal(str(binance_ticker['ask']))
okx_bid = Decimal(str(okx_ticker['bid']))
okx_ask = Decimal(str(okx_ticker['ask']))
# Вариант 1: покупаем на Binance, продаём на OKX
if okx_bid > binance_ask:
spread = (okx_bid - binance_ask) / binance_ask * 100
net_spread = spread - BINANCE_TAKER_FEE - OKX_TAKER_FEE
if net_spread > self.min_profit_pct:
return ArbitrageOpportunity(
buy_exchange='binance', buy_price=binance_ask,
sell_exchange='okx', sell_price=okx_bid,
net_profit_pct=net_spread
)
# Вариант 2: покупаем на OKX, продаём на Binance
if binance_bid > okx_ask:
spread = (binance_bid - okx_ask) / okx_ask * 100
net_spread = spread - OKX_TAKER_FEE - BINANCE_TAKER_FEE
if net_spread > self.min_profit_pct:
return ArbitrageOpportunity(
buy_exchange='okx', buy_price=okx_ask,
sell_exchange='binance', sell_price=binance_bid,
net_profit_pct=net_spread
)
return None
async def execute_arbitrage(self, opp: ArbitrageOpportunity, quantity: Decimal):
# Исполняем обе ноги одновременно
buy_task = self.place_order(opp.buy_exchange, 'buy', quantity, opp.buy_price)
sell_task = self.place_order(opp.sell_exchange, 'sell', quantity, opp.sell_price)
buy_result, sell_result = await asyncio.gather(buy_task, sell_task,
return_exceptions=True)
# Обрабатываем частичное исполнение
if isinstance(buy_result, Exception) or isinstance(sell_result, Exception):
await self.handle_partial_execution(buy_result, sell_result, opp)
Triangular Arbitrage (внутрибиржевой)
На одной бирже: BTC → ETH → USDT → BTC. Если произведение курсов > 1 + комиссии — есть возможность.
def find_triangular_opportunity(tickers: dict) -> TriangularPath | None:
"""
Ищем путь A → B → C → A где финальная сумма > начальной
"""
currencies = ['BTC', 'ETH', 'BNB', 'XRP', 'SOL']
for a, b, c in permutations(currencies, 3):
pair_ab = f"{a}/{b}"
pair_bc = f"{b}/{c}"
pair_ca = f"{c}/{a}"
if not all(p in tickers for p in [pair_ab, pair_bc, pair_ca]):
continue
# Рассчитываем эффективность цикла
# Покупаем A->B: платим ask A/B
rate_ab = Decimal(str(tickers[pair_ab]['ask']))
# Покупаем B->C: платим ask B/C
rate_bc = Decimal(str(tickers[pair_bc]['ask']))
# Продаём C->A: получаем bid C/A
rate_ca = Decimal(str(tickers[pair_ca]['bid']))
# Из 1 unit A получаем:
result = (1 / rate_ab) * (1 / rate_bc) * rate_ca
# Вычитаем 3 комиссии (taker каждый шаг)
after_fees = result * ((1 - TAKER_FEE) ** 3)
profit_pct = (after_fees - 1) * 100
if profit_pct > 0.05: # минимум 0.05% прибыли
return TriangularPath(
a=a, b=b, c=c,
rates=(rate_ab, rate_bc, rate_ca),
profit_pct=profit_pct,
)
return None
Statistical Arbitrage (pairs trading)
Более сложный подход: поиск статистически коинтегрированных пар (BTC/ETH исторически двигаются вместе). При расхождении спреда сверх порога — long отстающего, short обгоняющего.
import numpy as np
from statsmodels.tsa.stattools import coint
def find_cointegrated_pairs(prices: pd.DataFrame, threshold: float = 0.05) -> list:
"""
Тест на коинтеграцию: p-value < threshold → пара коинтегрирована
"""
n = prices.shape[1]
pairs = []
for i in range(n):
for j in range(i+1, n):
_, pvalue, _ = coint(prices.iloc[:, i], prices.iloc[:, j])
if pvalue < threshold:
pairs.append((prices.columns[i], prices.columns[j], pvalue))
return sorted(pairs, key=lambda x: x[2])
class PairsTrader:
def __init__(self, asset1: str, asset2: str, lookback: int = 60):
self.asset1 = asset1
self.asset2 = asset2
self.lookback = lookback
def calculate_zscore(self, prices: pd.DataFrame) -> float:
"""Z-score спреда за lookback период"""
spread = prices[self.asset1] - self.hedge_ratio * prices[self.asset2]
mean = spread.rolling(self.lookback).mean().iloc[-1]
std = spread.rolling(self.lookback).std().iloc[-1]
current = spread.iloc[-1]
return (current - mean) / std
def get_signal(self, zscore: float) -> str:
if zscore > 2.0:
return 'SHORT_1_LONG_2' # asset1 дорогой, asset2 дешёвый
elif zscore < -2.0:
return 'LONG_1_SHORT_2' # asset1 дешёвый, asset2 дорогой
elif abs(zscore) < 0.5:
return 'CLOSE' # спред вернулся к среднему — закрываем
return 'HOLD'
Execution риски и их митигация
Execution Risk
Между обнаружением возможности и исполнением цена может уйти. На Binance latency через WebSocket — 10–50 мс. За это время другой бот может "съесть" возможность.
Митигация:
- Colocation: сервер в том же дата-центре что и биржа (AWS Tokyo для Binance, AWS Frankfurt для OKX)
- WebSocket вместо REST: подписка на orderbook updates вместо polling
- Pre-placed orders: заранее выставленные limit ордера близко к рынку
class LowLatencyArbitrageBot:
def __init__(self):
# Подписываемся на WebSocket вместо polling
self.price_cache = {} # актуальные цены без задержки запроса
async def subscribe_prices(self, symbol: str):
"""WebSocket подписка — обновляем кэш при каждом тике"""
async with websockets.connect(BINANCE_WS_URL) as ws:
await ws.send(json.dumps({
'method': 'SUBSCRIBE',
'params': [f'{symbol.lower()}@bookTicker'],
'id': 1
}))
async for msg in ws:
data = json.loads(msg)
# bookTicker даёт лучший bid/ask без задержки
self.price_cache[symbol] = {
'bid': Decimal(data['b']),
'ask': Decimal(data['a']),
'ts': time.time_ns(),
}
await self.check_opportunity_fast(symbol)
Inventory Risk
Если одна нога исполнилась, а другая нет — остаётся открытая позиция с рыночным риском. Это называется "leg risk".
async def handle_partial_execution(self, buy_result, sell_result, opp):
"""Хеджирование при неполном исполнении одной ноги"""
if isinstance(sell_result, Exception) and not isinstance(buy_result, Exception):
# Купили, но не продали — немедленно продаём по рынку
filled_qty = buy_result['filled']
await self.emergency_sell(opp.sell_exchange, filled_qty)
elif isinstance(buy_result, Exception) and not isinstance(sell_result, Exception):
# Продали, но не купили — немедленно покупаем по рынку
filled_qty = sell_result['filled']
await self.emergency_buy(opp.buy_exchange, filled_qty)
Transfer риск (межбиржевой)
При межбиржевом арбитраже нужно поддерживать балансы на обеих биржах. Перевод между биржами занимает 10–60 минут (несколько подтверждений блокчейна). За это время курс может измениться.
Решение: капиталоёмкая модель — держим достаточный баланс на каждой бирже заранее. Ребалансировка раз в несколько часов через авто-трансфер при дисбалансе.
async def check_balance_rebalance(self):
"""Если дисбаланс > threshold — автоматически переводим"""
for exchange, balance in self.get_all_balances().items():
for currency, amount in balance.items():
target = self.target_balances[exchange][currency]
deviation = abs(amount - target) / target * 100
if deviation > 20: # дисбаланс более 20%
await self.initiate_transfer(exchange, currency, target - amount)
P&L и мониторинг
class ArbitragePnL:
def record_trade(self, trade: ArbitrageTrade):
gross_profit = trade.sell_amount - trade.buy_amount
fees = trade.buy_fee + trade.sell_fee + trade.transfer_fee
net_profit = gross_profit - fees
self.daily_pnl += net_profit
self.total_volume += trade.quantity
if net_profit < 0:
self.losing_trades += 1
log.warning(f"Losing arbitrage: net {net_profit:.4f} USDT")
def get_stats(self) -> dict:
return {
'daily_pnl': self.daily_pnl,
'win_rate': self.winning_trades / max(self.total_trades, 1),
'avg_profit_per_trade': self.daily_pnl / max(self.total_trades, 1),
'volume': self.total_volume,
}
| Тип арбитража | Требуемый капитал | Сложность | Конкуренция |
|---|---|---|---|
| Межбиржевой spot | Высокий | Средняя | Очень высокая |
| Triangular (внутри биржи) | Средний | Средняя | Высокая |
| Statistical / Pairs | Средний | Высокая | Умеренная |
| Cross-chain (DeFi) | Средний | Очень высокая | Умеренная |
| Funding rate | Средний | Низкая | Умеренная |
Сроки разработки
- Простой межбиржевой арбитраж с 2 биржами: 4–6 недель
- Triangular arbitrage: 3–4 недели
- Statistical arbitrage: 6–10 недель (включая research коинтеграции)
- Production-ready система с мониторингом и авто-ребалансировкой: 3–4 месяца







