Разработка системы объединения балансов на нескольких биржах

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка системы объединения балансов на нескольких биржах
Средний
~1-2 недели
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка системы объединения балансов на нескольких биржах

Система агрегации балансов даёт трейдеру единый view на все его активы распределённые по биржам, кошелькам и аккаунтам. Это фундамент для портфельного учёта, оптимизации размещения капитала и налоговой отчётности.

Структура данных

from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime

@dataclass
class AssetBalance:
    asset: str
    exchange: str
    account_type: str  # spot, margin, futures, earn
    available: Decimal
    locked: Decimal    # заморожено в ордерах
    total: Decimal

@dataclass
class PortfolioSnapshot:
    timestamp: datetime
    balances: list[AssetBalance]
    total_usd: Decimal
    by_exchange: dict[str, Decimal]  # USD-эквивалент по биржам
    by_asset: dict[str, Decimal]     # USD-эквивалент по активам

Параллельный сбор балансов

import asyncio
from decimal import Decimal

class BalanceAggregator:
    def __init__(self, exchange_clients: dict, price_feed):
        self.exchanges = exchange_clients
        self.price_feed = price_feed

    async def get_portfolio_snapshot(self) -> PortfolioSnapshot:
        # Параллельно получаем балансы со всех бирж
        balance_tasks = {
            name: asyncio.create_task(self._get_exchange_balances(name, client))
            for name, client in self.exchanges.items()
        }

        results = await asyncio.gather(
            *balance_tasks.values(),
            return_exceptions=True
        )

        all_balances = []
        for exchange_name, result in zip(balance_tasks.keys(), results):
            if isinstance(result, Exception):
                # Логируем ошибку, продолжаем с остальными
                logger.error(f"Failed to get balances from {exchange_name}: {result}")
                continue
            all_balances.extend(result)

        # Рассчитываем USD-эквиваленты
        prices = await self.price_feed.get_prices(
            {b.asset for b in all_balances} - {'USDT', 'USDC', 'BUSD'}
        )

        return self._build_snapshot(all_balances, prices)

    def _build_snapshot(self, balances: list[AssetBalance], prices: dict) -> PortfolioSnapshot:
        total_usd = Decimal(0)
        by_exchange: dict[str, Decimal] = {}
        by_asset: dict[str, Decimal] = {}

        for b in balances:
            price = prices.get(b.asset, Decimal(1))  # стейблкоины = 1
            usd_value = b.total * price

            total_usd += usd_value
            by_exchange[b.exchange] = by_exchange.get(b.exchange, Decimal(0)) + usd_value
            by_asset[b.asset] = by_asset.get(b.asset, Decimal(0)) + usd_value

        return PortfolioSnapshot(
            timestamp=datetime.utcnow(),
            balances=balances,
            total_usd=total_usd,
            by_exchange=by_exchange,
            by_asset=by_asset,
        )

Кэширование и обновление

Полный refresh балансов делается не чаще 1 раза в минуту (REST API limits). Для real-time обновлений используем WebSocket User Data Stream (Binance) или аналоги:

async def subscribe_balance_updates(self, exchange: str):
    """Подписка на обновления баланса через User Data Stream"""
    listen_key = await self.get_listen_key(exchange)

    async with websockets.connect(f"wss://stream.binance.com:9443/ws/{listen_key}") as ws:
        async for message in ws:
            data = json.loads(message)

            if data.get("e") == "outboundAccountPosition":
                # Обновляем кэш баланса
                for balance in data["B"]:
                    await self.update_cached_balance(
                        exchange=exchange,
                        asset=balance["a"],
                        free=Decimal(balance["f"]),
                        locked=Decimal(balance["l"]),
                    )

Аллокация капитала

Имея агрегированный view, можно строить инструменты оптимизации размещения:

def suggest_rebalancing(portfolio: PortfolioSnapshot, target_allocation: dict[str, float]) -> list[str]:
    """Предлагает переводы для приближения к целевой аллокации"""
    suggestions = []

    for asset, target_pct in target_allocation.items():
        current_usd = portfolio.by_asset.get(asset, Decimal(0))
        current_pct = float(current_usd / portfolio.total_usd * 100)
        diff = target_pct - current_pct

        if abs(diff) > 2:  # порог ребалансировки
            action = "BUY" if diff > 0 else "SELL"
            usd_amount = abs(diff / 100) * float(portfolio.total_usd)
            suggestions.append(f"{action} ${usd_amount:.0f} of {asset} (current {current_pct:.1f}% → target {target_pct:.1f}%)")

    return suggestions

Исторические снапшоты портфеля сохраняются в TimescaleDB — это позволяет строить график роста портфеля во времени, рассчитывать доходность по периодам, генерировать отчёты для налоговых целей.