Разработка кастомного фреймворка бэктестинга
Кастомный фреймворк оправдан когда готовые решения (Backtrader, Freqtrade) не закрывают специфические требования: нестандартные типы активов, сложные multi-asset стратегии, tick-данные, специфические модели исполнения ордеров или требования к производительности.
Принципы проектирования
Разделение ответственности: стратегия не должна знать ничего о механике исполнения ордеров. Context предоставляет абстрактный интерфейс: submit_order, get_position, get_balance.
Детерминизм: одни и те же данные + параметры = один и тот же результат. Никаких random seed без явного управления.
Отсутствие look-ahead: данные, доступные стратегии в момент T, не должны содержать информацию о T+1 и далее.
Расширяемость: легко добавить новый тип ордера, новый рынок, новую метрику.
Ядро: Event System
from dataclasses import dataclass, field
from typing import Protocol, runtime_checkable
from enum import Enum
class EventType(Enum):
BAR = "BAR"
TICK = "TICK"
ORDER_FILL = "ORDER_FILL"
ORDER_REJECT = "ORDER_REJECT"
POSITION_UPDATE = "POSITION_UPDATE"
@dataclass
class BarEvent:
type: EventType = EventType.BAR
symbol: str = ""
timestamp: int = 0
open: float = 0.0
high: float = 0.0
low: float = 0.0
close: float = 0.0
volume: float = 0.0
@dataclass
class FillEvent:
type: EventType = EventType.ORDER_FILL
order_id: str = ""
symbol: str = ""
side: str = ""
fill_price: float = 0.0
quantity: float = 0.0
commission: float = 0.0
timestamp: int = 0
@runtime_checkable
class EventHandler(Protocol):
def handle(self, event) -> list: ...
class EventBus:
def __init__(self):
self._handlers: dict[EventType, list[EventHandler]] = {}
self._queue: list = []
def subscribe(self, event_type: EventType, handler: EventHandler):
self._handlers.setdefault(event_type, []).append(handler)
def publish(self, event):
self._queue.append(event)
def process_queue(self):
while self._queue:
event = self._queue.pop(0)
for handler in self._handlers.get(event.type, []):
new_events = handler.handle(event)
if new_events:
self._queue.extend(new_events)
Data Feed абстракция
from abc import ABC, abstractmethod
from typing import Iterator
class DataFeed(ABC):
@abstractmethod
def __iter__(self) -> Iterator[BarEvent]:
pass
class CSVDataFeed(DataFeed):
def __init__(self, filepath: str, symbol: str):
self.filepath = filepath
self.symbol = symbol
def __iter__(self) -> Iterator[BarEvent]:
import csv
with open(self.filepath) as f:
reader = csv.DictReader(f)
for row in reader:
yield BarEvent(
symbol=self.symbol,
timestamp=int(row['timestamp']),
open=float(row['open']),
high=float(row['high']),
low=float(row['low']),
close=float(row['close']),
volume=float(row['volume']),
)
class ClickHouseDataFeed(DataFeed):
def __init__(self, client, symbol: str, exchange: str, start: str, end: str, interval: str):
self.client = client
self.symbol = symbol
self.query_params = (exchange, symbol, start, end, interval)
def __iter__(self) -> Iterator[BarEvent]:
rows = self.client.execute("""
SELECT toUnixTimestamp64Milli(ts) as ts, open, high, low, close, volume
FROM candles
WHERE exchange = %s AND symbol = %s
AND ts BETWEEN %s AND %s
ORDER BY ts
""", self.query_params)
for row in rows:
yield BarEvent(
symbol=self.symbol,
timestamp=row[0],
open=row[1], high=row[2], low=row[3], close=row[4], volume=row[5],
)
Multi-Asset поддержка
class MultiAssetBacktester:
def __init__(
self,
feeds: dict[str, DataFeed],
portfolio: Portfolio,
broker: SimulatedBroker,
):
self.feeds = feeds
self.portfolio = portfolio
self.broker = broker
def run(self, strategy: Strategy) -> BacktestResult:
# Мёрджим все фиды по timestamp
merged_bars = self._merge_feeds()
for timestamp, symbol, bar in merged_bars:
# Обрабатываем ордера на этом баре
self._process_pending_orders(symbol, bar)
# Обновляем unrealized PnL
self.portfolio.update_market_prices({symbol: bar.close})
# Вызываем стратегию
strategy.on_bar(symbol, bar)
return self._build_result()
def _merge_feeds(self):
"""Объединяем фиды в хронологическом порядке"""
import heapq
heap = []
iterators = {}
for symbol, feed in self.feeds.items():
it = iter(feed)
iterators[symbol] = it
try:
bar = next(it)
heapq.heappush(heap, (bar.timestamp, symbol, bar))
except StopIteration:
pass
while heap:
timestamp, symbol, bar = heapq.heappop(heap)
yield timestamp, symbol, bar
try:
next_bar = next(iterators[symbol])
heapq.heappush(heap, (next_bar.timestamp, symbol, next_bar))
except StopIteration:
pass
Модульное тестирование компонентов
Кастомный фреймворк требует хорошего покрытия тестами:
import pytest
from decimal import Decimal
def test_portfolio_long_trade():
portfolio = Portfolio(initial_cash=100_000.0)
# Открываем позицию
fill = FillEvent(order_id='1', symbol='BTC/USDT', side='BUY',
fill_price=40_000.0, quantity=0.1, commission=4.0)
portfolio.process_fill(fill)
assert portfolio.cash == pytest.approx(100_000 - 40_000 * 0.1 - 4.0, rel=1e-6)
assert portfolio.positions['BTC/USDT'].quantity == pytest.approx(0.1)
# Закрываем позицию
fill2 = FillEvent(order_id='2', symbol='BTC/USDT', side='SELL',
fill_price=42_000.0, quantity=0.1, commission=4.2)
portfolio.process_fill(fill2)
# PnL = (42000 - 40000) * 0.1 - 4.0 - 4.2 = 200 - 8.2 = 191.8
assert portfolio.trades[-1]['pnl'] == pytest.approx(191.8, rel=1e-4)
assert 'BTC/USDT' not in portfolio.positions
def test_no_lookahead_bias():
"""Проверяем что стратегия не видит будущих данных"""
seen_bars = []
class TrackingStrategy(Strategy):
def on_bar(self, symbol: str, bar: BarEvent):
seen_bars.append(bar.close)
# Текущий close не должен влиять на сигнал,
# используем только предыдущие bars
if len(seen_bars) >= 2:
assert seen_bars[-1] != seen_bars[-2] or True # просто проверяем доступ
# Запускаем и проверяем что bars приходят в хронологическом порядке
backtester = Backtester(...)
backtester.run(TrackingStrategy(), data)
timestamps = [b.timestamp for b in all_received_bars]
assert timestamps == sorted(timestamps)
Кастомный фреймворк — значительные инвестиции времени. Оправданы при серьёзных требованиях к точности симуляции, нестандартных активах или необходимости глубокой оптимизации производительности.







