Разработка арбитражного бота

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1 услугВсе 1306 услуг
Разработка арбитражного бота
Средняя
~1-2 недели
Часто задаваемые вопросы
Направления блокчейн-разработки
Этапы блокчейн-разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1221
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1163
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    855
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1060
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    828

Разработка арбитражного бота

Арбитраж — это использование ценовых расхождений между рынками для получения прибыли без (теоретически) рыночного риска. На практике риск есть: execution risk, latency risk, funding risk. Разберём реальные арбитражные стратегии и их техническую реализацию.

Типы арбитражных стратегий

Простой арбитраж (межбиржевой)

Одинаковый актив торгуется на двух биржах по разной цене. BTC на Binance — $42,100, на OKX — $42,150. Покупаем на Binance, продаём на OKX, разница $50 — наша прибыль.

Главная проблема: к моменту исполнения обеих ног цены выровняются. Нужна максимально низкая latency и предварительно размещённые балансы на обеих биржах.

import asyncio
import aiohttp
from decimal import Decimal

class SimpleArbitrageBot:
    def __init__(self):
        self.binance = ccxt.binance({'apiKey': BINANCE_KEY, 'secret': BINANCE_SECRET})
        self.okx = ccxt.okx({'apiKey': OKX_KEY, 'secret': OKX_SECRET})
        self.min_profit_pct = Decimal('0.15')  # минимум 0.15% после комиссий
    
    async def check_opportunity(self, symbol: str) -> ArbitrageOpportunity | None:
        # Параллельный запрос цен с обеих бирж
        binance_ticker, okx_ticker = await asyncio.gather(
            self.binance.fetch_ticker(symbol),
            self.okx.fetch_ticker(symbol),
        )
        
        binance_bid = Decimal(str(binance_ticker['bid']))
        binance_ask = Decimal(str(binance_ticker['ask']))
        okx_bid = Decimal(str(okx_ticker['bid']))
        okx_ask = Decimal(str(okx_ticker['ask']))
        
        # Вариант 1: покупаем на Binance, продаём на OKX
        if okx_bid > binance_ask:
            spread = (okx_bid - binance_ask) / binance_ask * 100
            net_spread = spread - BINANCE_TAKER_FEE - OKX_TAKER_FEE
            if net_spread > self.min_profit_pct:
                return ArbitrageOpportunity(
                    buy_exchange='binance', buy_price=binance_ask,
                    sell_exchange='okx', sell_price=okx_bid,
                    net_profit_pct=net_spread
                )
        
        # Вариант 2: покупаем на OKX, продаём на Binance
        if binance_bid > okx_ask:
            spread = (binance_bid - okx_ask) / okx_ask * 100
            net_spread = spread - OKX_TAKER_FEE - BINANCE_TAKER_FEE
            if net_spread > self.min_profit_pct:
                return ArbitrageOpportunity(
                    buy_exchange='okx', buy_price=okx_ask,
                    sell_exchange='binance', sell_price=binance_bid,
                    net_profit_pct=net_spread
                )
        
        return None
    
    async def execute_arbitrage(self, opp: ArbitrageOpportunity, quantity: Decimal):
        # Исполняем обе ноги одновременно
        buy_task = self.place_order(opp.buy_exchange, 'buy', quantity, opp.buy_price)
        sell_task = self.place_order(opp.sell_exchange, 'sell', quantity, opp.sell_price)
        
        buy_result, sell_result = await asyncio.gather(buy_task, sell_task, 
                                                        return_exceptions=True)
        
        # Обрабатываем частичное исполнение
        if isinstance(buy_result, Exception) or isinstance(sell_result, Exception):
            await self.handle_partial_execution(buy_result, sell_result, opp)

Triangular Arbitrage (внутрибиржевой)

На одной бирже: BTC → ETH → USDT → BTC. Если произведение курсов > 1 + комиссии — есть возможность.

def find_triangular_opportunity(tickers: dict) -> TriangularPath | None:
    """
    Ищем путь A → B → C → A где финальная сумма > начальной
    """
    currencies = ['BTC', 'ETH', 'BNB', 'XRP', 'SOL']
    
    for a, b, c in permutations(currencies, 3):
        pair_ab = f"{a}/{b}"
        pair_bc = f"{b}/{c}"
        pair_ca = f"{c}/{a}"
        
        if not all(p in tickers for p in [pair_ab, pair_bc, pair_ca]):
            continue
        
        # Рассчитываем эффективность цикла
        # Покупаем A->B: платим ask A/B
        rate_ab = Decimal(str(tickers[pair_ab]['ask']))
        # Покупаем B->C: платим ask B/C
        rate_bc = Decimal(str(tickers[pair_bc]['ask']))
        # Продаём C->A: получаем bid C/A
        rate_ca = Decimal(str(tickers[pair_ca]['bid']))
        
        # Из 1 unit A получаем:
        result = (1 / rate_ab) * (1 / rate_bc) * rate_ca
        
        # Вычитаем 3 комиссии (taker каждый шаг)
        after_fees = result * ((1 - TAKER_FEE) ** 3)
        
        profit_pct = (after_fees - 1) * 100
        
        if profit_pct > 0.05:  # минимум 0.05% прибыли
            return TriangularPath(
                a=a, b=b, c=c,
                rates=(rate_ab, rate_bc, rate_ca),
                profit_pct=profit_pct,
            )
    
    return None

Statistical Arbitrage (pairs trading)

Более сложный подход: поиск статистически коинтегрированных пар (BTC/ETH исторически двигаются вместе). При расхождении спреда сверх порога — long отстающего, short обгоняющего.

import numpy as np
from statsmodels.tsa.stattools import coint

def find_cointegrated_pairs(prices: pd.DataFrame, threshold: float = 0.05) -> list:
    """
    Тест на коинтеграцию: p-value < threshold → пара коинтегрирована
    """
    n = prices.shape[1]
    pairs = []
    
    for i in range(n):
        for j in range(i+1, n):
            _, pvalue, _ = coint(prices.iloc[:, i], prices.iloc[:, j])
            if pvalue < threshold:
                pairs.append((prices.columns[i], prices.columns[j], pvalue))
    
    return sorted(pairs, key=lambda x: x[2])

class PairsTrader:
    def __init__(self, asset1: str, asset2: str, lookback: int = 60):
        self.asset1 = asset1
        self.asset2 = asset2
        self.lookback = lookback
    
    def calculate_zscore(self, prices: pd.DataFrame) -> float:
        """Z-score спреда за lookback период"""
        spread = prices[self.asset1] - self.hedge_ratio * prices[self.asset2]
        
        mean = spread.rolling(self.lookback).mean().iloc[-1]
        std = spread.rolling(self.lookback).std().iloc[-1]
        current = spread.iloc[-1]
        
        return (current - mean) / std
    
    def get_signal(self, zscore: float) -> str:
        if zscore > 2.0:
            return 'SHORT_1_LONG_2'  # asset1 дорогой, asset2 дешёвый
        elif zscore < -2.0:
            return 'LONG_1_SHORT_2'  # asset1 дешёвый, asset2 дорогой
        elif abs(zscore) < 0.5:
            return 'CLOSE'  # спред вернулся к среднему — закрываем
        return 'HOLD'

Execution риски и их митигация

Execution Risk

Между обнаружением возможности и исполнением цена может уйти. На Binance latency через WebSocket — 10–50 мс. За это время другой бот может "съесть" возможность.

Митигация:

  • Colocation: сервер в том же дата-центре что и биржа (AWS Tokyo для Binance, AWS Frankfurt для OKX)
  • WebSocket вместо REST: подписка на orderbook updates вместо polling
  • Pre-placed orders: заранее выставленные limit ордера близко к рынку
class LowLatencyArbitrageBot:
    def __init__(self):
        # Подписываемся на WebSocket вместо polling
        self.price_cache = {}  # актуальные цены без задержки запроса
    
    async def subscribe_prices(self, symbol: str):
        """WebSocket подписка — обновляем кэш при каждом тике"""
        async with websockets.connect(BINANCE_WS_URL) as ws:
            await ws.send(json.dumps({
                'method': 'SUBSCRIBE',
                'params': [f'{symbol.lower()}@bookTicker'],
                'id': 1
            }))
            async for msg in ws:
                data = json.loads(msg)
                # bookTicker даёт лучший bid/ask без задержки
                self.price_cache[symbol] = {
                    'bid': Decimal(data['b']),
                    'ask': Decimal(data['a']),
                    'ts': time.time_ns(),
                }
                await self.check_opportunity_fast(symbol)

Inventory Risk

Если одна нога исполнилась, а другая нет — остаётся открытая позиция с рыночным риском. Это называется "leg risk".

async def handle_partial_execution(self, buy_result, sell_result, opp):
    """Хеджирование при неполном исполнении одной ноги"""
    if isinstance(sell_result, Exception) and not isinstance(buy_result, Exception):
        # Купили, но не продали — немедленно продаём по рынку
        filled_qty = buy_result['filled']
        await self.emergency_sell(opp.sell_exchange, filled_qty)
        
    elif isinstance(buy_result, Exception) and not isinstance(sell_result, Exception):
        # Продали, но не купили — немедленно покупаем по рынку
        filled_qty = sell_result['filled']
        await self.emergency_buy(opp.buy_exchange, filled_qty)

Transfer риск (межбиржевой)

При межбиржевом арбитраже нужно поддерживать балансы на обеих биржах. Перевод между биржами занимает 10–60 минут (несколько подтверждений блокчейна). За это время курс может измениться.

Решение: капиталоёмкая модель — держим достаточный баланс на каждой бирже заранее. Ребалансировка раз в несколько часов через авто-трансфер при дисбалансе.

async def check_balance_rebalance(self):
    """Если дисбаланс > threshold — автоматически переводим"""
    for exchange, balance in self.get_all_balances().items():
        for currency, amount in balance.items():
            target = self.target_balances[exchange][currency]
            deviation = abs(amount - target) / target * 100
            
            if deviation > 20:  # дисбаланс более 20%
                await self.initiate_transfer(exchange, currency, target - amount)

P&L и мониторинг

class ArbitragePnL:
    def record_trade(self, trade: ArbitrageTrade):
        gross_profit = trade.sell_amount - trade.buy_amount
        fees = trade.buy_fee + trade.sell_fee + trade.transfer_fee
        net_profit = gross_profit - fees
        
        self.daily_pnl += net_profit
        self.total_volume += trade.quantity
        
        if net_profit < 0:
            self.losing_trades += 1
            log.warning(f"Losing arbitrage: net {net_profit:.4f} USDT")
        
    def get_stats(self) -> dict:
        return {
            'daily_pnl': self.daily_pnl,
            'win_rate': self.winning_trades / max(self.total_trades, 1),
            'avg_profit_per_trade': self.daily_pnl / max(self.total_trades, 1),
            'volume': self.total_volume,
        }
Тип арбитража Требуемый капитал Сложность Конкуренция
Межбиржевой spot Высокий Средняя Очень высокая
Triangular (внутри биржи) Средний Средняя Высокая
Statistical / Pairs Средний Высокая Умеренная
Cross-chain (DeFi) Средний Очень высокая Умеренная
Funding rate Средний Низкая Умеренная

Сроки разработки

  • Простой межбиржевой арбитраж с 2 биржами: 4–6 недель
  • Triangular arbitrage: 3–4 недели
  • Statistical arbitrage: 6–10 недель (включая research коинтеграции)
  • Production-ready система с мониторингом и авто-ребалансировкой: 3–4 месяца