Разработка движка бэктестинга на Python
Python — доминирующий язык для разработки бэктест-движков благодаря богатой экосистеме: NumPy, pandas, scipy для вычислений, ccxt для биржевой интеграции, matplotlib/plotly для визуализации. Разработка собственного движка оправдана когда готовые решения (Backtrader, Freqtrade) не закрывают специфические требования.
Архитектура кастомного движка
# Основные абстракции
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Optional
import pandas as pd
@dataclass
class Bar:
timestamp: pd.Timestamp
open: float
high: float
low: float
close: float
volume: float
@dataclass
class Order:
id: str
symbol: str
side: str # 'BUY' | 'SELL'
type: str # 'MARKET' | 'LIMIT' | 'STOP'
quantity: float
price: Optional[float] = None
stop_price: Optional[float] = None
status: str = 'PENDING'
@dataclass
class Position:
symbol: str
side: str
quantity: float
avg_entry_price: float
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
class Strategy(ABC):
def __init__(self, context: 'BacktestContext'):
self.ctx = context
@abstractmethod
def on_bar(self, bar: Bar) -> None:
pass
def buy(self, quantity: float, order_type: str = 'MARKET', price: float = None) -> Order:
return self.ctx.submit_order(Order(
id=self.ctx.generate_id(),
symbol=self.ctx.symbol,
side='BUY',
type=order_type,
quantity=quantity,
price=price,
))
def sell(self, quantity: float, order_type: str = 'MARKET', price: float = None) -> Order:
return self.ctx.submit_order(Order(
id=self.ctx.generate_id(),
symbol=self.ctx.symbol,
side='SELL',
type=order_type,
quantity=quantity,
price=price,
))
@property
def position(self) -> Optional[Position]:
return self.ctx.get_position(self.ctx.symbol)
@property
def cash(self) -> float:
return self.ctx.portfolio.cash
Портфель и учёт позиций
class Portfolio:
def __init__(self, initial_cash: float):
self.initial_cash = initial_cash
self.cash = initial_cash
self.positions: dict[str, Position] = {}
self.trades: list[dict] = []
self.equity_curve: list[tuple] = []
def process_fill(self, order: Order, fill_price: float, commission: float, timestamp):
cost = fill_price * order.quantity
if order.side == 'BUY':
self.cash -= (cost + commission)
symbol = order.symbol
if symbol in self.positions:
pos = self.positions[symbol]
# FIFO: усредняем вход
total_qty = pos.quantity + order.quantity
pos.avg_entry_price = (
pos.avg_entry_price * pos.quantity + fill_price * order.quantity
) / total_qty
pos.quantity = total_qty
else:
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=order.quantity,
avg_entry_price=fill_price,
)
elif order.side == 'SELL':
self.cash += (cost - commission)
pos = self.positions.get(order.symbol)
if pos:
realized_pnl = (fill_price - pos.avg_entry_price) * order.quantity - commission
pos.quantity -= order.quantity
pos.realized_pnl += realized_pnl
self.trades.append({
'timestamp': timestamp,
'symbol': order.symbol,
'entry': pos.avg_entry_price,
'exit': fill_price,
'quantity': order.quantity,
'pnl': realized_pnl,
})
if pos.quantity <= 0:
del self.positions[order.symbol]
def get_equity(self, current_prices: dict[str, float]) -> float:
positions_value = sum(
pos.quantity * current_prices.get(symbol, pos.avg_entry_price)
for symbol, pos in self.positions.items()
)
return self.cash + positions_value
Симулятор исполнения с реализмом
class RealisticBroker:
def __init__(
self,
commission_pct: float = 0.001, # 0.1%
slippage_pct: float = 0.0005, # 0.05%
partial_fill_prob: float = 0.0, # 0 = всегда полное исполнение
):
self.commission_pct = commission_pct
self.slippage_pct = slippage_pct
self.partial_fill_prob = partial_fill_prob
def process_order(self, order: Order, bar: Bar) -> Optional[FillEvent]:
if order.type == 'MARKET':
# Market order исполняется на открытии следующего бара
base_price = bar.open
slippage = base_price * self.slippage_pct
fill_price = base_price + slippage if order.side == 'BUY' else base_price - slippage
elif order.type == 'LIMIT':
if order.side == 'BUY' and bar.low <= order.price:
fill_price = min(order.price, bar.open)
elif order.side == 'SELL' and bar.high >= order.price:
fill_price = max(order.price, bar.open)
else:
return None # Не исполнен
elif order.type == 'STOP':
if order.side == 'SELL' and bar.low <= order.stop_price:
# Stop triggered, исполняем как market с gap down риском
fill_price = min(order.stop_price, bar.open)
# Slippage на stop — реалистично больше
fill_price -= fill_price * self.slippage_pct * 2
else:
return None
commission = fill_price * order.quantity * self.commission_pct
return FillEvent(
order_id=order.id,
fill_price=fill_price,
quantity=order.quantity,
commission=commission,
timestamp=bar.timestamp,
)
Основной loop бэктеста
class Backtester:
def run(
self,
strategy_class,
strategy_params: dict,
ohlcv_data: pd.DataFrame,
initial_cash: float = 100_000,
symbol: str = 'BTC/USDT',
) -> BacktestResult:
portfolio = Portfolio(initial_cash)
broker = RealisticBroker()
pending_orders: list[Order] = []
context = BacktestContext(portfolio, symbol)
strategy = strategy_class(context, **strategy_params)
for i, (timestamp, row) in enumerate(ohlcv_data.iterrows()):
bar = Bar(timestamp=timestamp, **row.to_dict())
# Сначала обрабатываем pending ордера на текущем баре
still_pending = []
for order in pending_orders:
fill = broker.process_order(order, bar)
if fill:
portfolio.process_fill(order, fill.fill_price, fill.commission, timestamp)
else:
still_pending.append(order)
pending_orders = still_pending
# Обновляем unrealized P&L
for pos in portfolio.positions.values():
pos.unrealized_pnl = (bar.close - pos.avg_entry_price) * pos.quantity
# Вызываем стратегию
context.current_bar = bar
strategy.on_bar(bar)
# Новые ордера от стратегии в pending
pending_orders.extend(context.pop_new_orders())
# Записываем equity
equity = portfolio.get_equity({symbol: bar.close})
portfolio.equity_curve.append((timestamp, equity))
equity_series = pd.Series(
[e for _, e in portfolio.equity_curve],
index=[t for t, _ in portfolio.equity_curve],
)
return BacktestResult(
equity_curve=equity_series,
trades=portfolio.trades,
metrics=calculate_metrics(equity_series, portfolio.trades),
)
Производительность
Для оптимизации параметров (тысячи прогонов) важна скорость:
- NumPy vectorization для технических индикаторов вместо Python loops
-
Numba JIT (
@numba.jit) для hot-path вычислений - Multiprocessing для параллельного перебора параметров
- Chunked data loading — загружаем данные частями, не всё в память
from multiprocessing import Pool
import itertools
def optimize_parameters(strategy_class, data, param_grid: dict) -> pd.DataFrame:
combinations = list(itertools.product(*param_grid.values()))
param_names = list(param_grid.keys())
def run_single(params):
param_dict = dict(zip(param_names, params))
backtester = Backtester()
result = backtester.run(strategy_class, param_dict, data)
return {**param_dict, **result.metrics.__dict__}
with Pool(processes=8) as pool:
results = pool.map(run_single, combinations)
return pd.DataFrame(results).sort_values('sharpe_ratio', ascending=False)
На 8 ядрах перебор 1000 комбинаций параметров с 1-летним датасетом занимает 10–30 минут в зависимости от сложности стратегии.







