Разработка системы объединения балансов на нескольких биржах
Система агрегации балансов даёт трейдеру единый view на все его активы распределённые по биржам, кошелькам и аккаунтам. Это фундамент для портфельного учёта, оптимизации размещения капитала и налоговой отчётности.
Структура данных
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
@dataclass
class AssetBalance:
asset: str
exchange: str
account_type: str # spot, margin, futures, earn
available: Decimal
locked: Decimal # заморожено в ордерах
total: Decimal
@dataclass
class PortfolioSnapshot:
timestamp: datetime
balances: list[AssetBalance]
total_usd: Decimal
by_exchange: dict[str, Decimal] # USD-эквивалент по биржам
by_asset: dict[str, Decimal] # USD-эквивалент по активам
Параллельный сбор балансов
import asyncio
from decimal import Decimal
class BalanceAggregator:
def __init__(self, exchange_clients: dict, price_feed):
self.exchanges = exchange_clients
self.price_feed = price_feed
async def get_portfolio_snapshot(self) -> PortfolioSnapshot:
# Параллельно получаем балансы со всех бирж
balance_tasks = {
name: asyncio.create_task(self._get_exchange_balances(name, client))
for name, client in self.exchanges.items()
}
results = await asyncio.gather(
*balance_tasks.values(),
return_exceptions=True
)
all_balances = []
for exchange_name, result in zip(balance_tasks.keys(), results):
if isinstance(result, Exception):
# Логируем ошибку, продолжаем с остальными
logger.error(f"Failed to get balances from {exchange_name}: {result}")
continue
all_balances.extend(result)
# Рассчитываем USD-эквиваленты
prices = await self.price_feed.get_prices(
{b.asset for b in all_balances} - {'USDT', 'USDC', 'BUSD'}
)
return self._build_snapshot(all_balances, prices)
def _build_snapshot(self, balances: list[AssetBalance], prices: dict) -> PortfolioSnapshot:
total_usd = Decimal(0)
by_exchange: dict[str, Decimal] = {}
by_asset: dict[str, Decimal] = {}
for b in balances:
price = prices.get(b.asset, Decimal(1)) # стейблкоины = 1
usd_value = b.total * price
total_usd += usd_value
by_exchange[b.exchange] = by_exchange.get(b.exchange, Decimal(0)) + usd_value
by_asset[b.asset] = by_asset.get(b.asset, Decimal(0)) + usd_value
return PortfolioSnapshot(
timestamp=datetime.utcnow(),
balances=balances,
total_usd=total_usd,
by_exchange=by_exchange,
by_asset=by_asset,
)
Кэширование и обновление
Полный refresh балансов делается не чаще 1 раза в минуту (REST API limits). Для real-time обновлений используем WebSocket User Data Stream (Binance) или аналоги:
async def subscribe_balance_updates(self, exchange: str):
"""Подписка на обновления баланса через User Data Stream"""
listen_key = await self.get_listen_key(exchange)
async with websockets.connect(f"wss://stream.binance.com:9443/ws/{listen_key}") as ws:
async for message in ws:
data = json.loads(message)
if data.get("e") == "outboundAccountPosition":
# Обновляем кэш баланса
for balance in data["B"]:
await self.update_cached_balance(
exchange=exchange,
asset=balance["a"],
free=Decimal(balance["f"]),
locked=Decimal(balance["l"]),
)
Аллокация капитала
Имея агрегированный view, можно строить инструменты оптимизации размещения:
def suggest_rebalancing(portfolio: PortfolioSnapshot, target_allocation: dict[str, float]) -> list[str]:
"""Предлагает переводы для приближения к целевой аллокации"""
suggestions = []
for asset, target_pct in target_allocation.items():
current_usd = portfolio.by_asset.get(asset, Decimal(0))
current_pct = float(current_usd / portfolio.total_usd * 100)
diff = target_pct - current_pct
if abs(diff) > 2: # порог ребалансировки
action = "BUY" if diff > 0 else "SELL"
usd_amount = abs(diff / 100) * float(portfolio.total_usd)
suggestions.append(f"{action} ${usd_amount:.0f} of {asset} (current {current_pct:.1f}% → target {target_pct:.1f}%)")
return suggestions
Исторические снапшоты портфеля сохраняются в TimescaleDB — это позволяет строить график роста портфеля во времени, рассчитывать доходность по периодам, генерировать отчёты для налоговых целей.







