Разработка системы подписки на торговые сигналы
Система подписки на торговые сигналы позволяет пользователям получать уведомления о торговых возможностях от аналитиков или алгоритмических систем. В отличие от copy trading, пользователь принимает решение самостоятельно — сигнал это рекомендация, не автоматическое исполнение.
Компоненты системы
Signal Providers — источники сигналов: аналитики-трейдеры, алгоритмические системы, on-chain аналитика.
Signal Format — структурированное сообщение: инструмент, направление, цена входа, take profit уровни, stop loss, timeframe, обоснование.
Distribution Engine — доставка сигнала всем подписчикам через разные каналы.
Subscription Management — управление подписками, тарифами, оплатой.
Performance Tracking — отслеживание результатов каждого сигнала для расчёта win rate провайдера.
Модель данных сигнала
from pydantic import BaseModel
from decimal import Decimal
from datetime import datetime
from typing import Optional
class TradingSignal(BaseModel):
id: str
provider_id: str
symbol: str # BTC/USDT
exchange: str # binance
direction: str # LONG / SHORT
entry_type: str # MARKET / LIMIT / ZONE
entry_price: Decimal # или None для market
entry_zone_low: Optional[Decimal]
entry_zone_high: Optional[Decimal]
take_profit_levels: list[Decimal] # [tp1, tp2, tp3]
stop_loss: Decimal
leverage: Optional[int] # для фьючерсов
risk_pct: Optional[float] # рекомендуемый % риска от капитала
timeframe: str # 4h, 1d
rationale: str # текстовое обоснование
chart_url: Optional[str] # скриншот разметки
expires_at: Optional[datetime]
created_at: datetime = datetime.utcnow()
Distribution Engine
class SignalDistributor:
def __init__(self, telegram_bot, email_service, push_service, websocket_hub):
self.channels = {
'telegram': telegram_bot,
'email': email_service,
'push': push_service,
'websocket': websocket_hub,
}
async def distribute(self, signal: TradingSignal):
# Получаем всех подписчиков этого провайдера
subscribers = await self.subscription_repo.get_active_subscribers(
provider_id=signal.provider_id
)
# Группируем по предпочтительным каналам уведомлений
by_channel: dict[str, list] = {}
for sub in subscribers:
for channel in sub.notification_channels:
by_channel.setdefault(channel, []).append(sub.user_id)
# Параллельно рассылаем по каналам
tasks = []
for channel, user_ids in by_channel.items():
handler = self.channels.get(channel)
if handler:
tasks.append(handler.send_signal(signal, user_ids))
await asyncio.gather(*tasks, return_exceptions=True)
# Логируем факт отправки
await self.signal_repo.mark_distributed(signal.id, len(subscribers))
Telegram доставка
class TelegramSignalBot:
def format_signal(self, signal: TradingSignal) -> str:
tp_lines = '\n'.join(
f" TP{i+1}: ${tp:,.2f}"
for i, tp in enumerate(signal.take_profit_levels)
)
return f"""
📊 **{signal.symbol}** — {signal.direction}
**Вход:** {'рыночный' if signal.entry_type == 'MARKET' else f'${signal.entry_price:,.2f}'}
**Stop Loss:** ${signal.stop_loss:,.2f}
**Take Profit:**
{tp_lines}
**Таймфрейм:** {signal.timeframe}
**Риск:** {signal.risk_pct or 1}% от депозита
📝 {signal.rationale}
""".strip()
async def send_signal(self, signal: TradingSignal, user_ids: list[str]):
text = self.format_signal(signal)
# Батчами по 30 (Telegram rate limit)
for batch in chunks(user_ids, 30):
tasks = [
self.bot.send_message(user_id, text, parse_mode='Markdown')
for user_id in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(1) # rate limit
Performance Tracking
class SignalPerformanceTracker:
async def track_signal_outcome(self, signal: TradingSignal):
"""Отслеживаем результат сигнала по рыночным данным"""
entry_time = signal.created_at
# Проверяем достигнут ли вход
entry_price = await self.find_entry_price(signal)
if not entry_price:
await self.mark_signal_missed(signal.id)
return
# Отслеживаем TP и SL
outcome = await self.monitor_until_close(
symbol=signal.symbol,
direction=signal.direction,
entry=entry_price,
tp_levels=signal.take_profit_levels,
sl=signal.stop_loss,
)
await self.signal_repo.save_outcome(
signal_id=signal.id,
entry_price=entry_price,
exit_price=outcome.exit_price,
exit_reason=outcome.reason, # 'TP1', 'TP2', 'SL', 'EXPIRED'
pnl_pct=outcome.pnl_pct,
)
Накопленная статистика результатов — ключевой показатель для новых подписчиков. Win rate, средний R:R, P&L по времени, процент сработавших TP1/TP2/TP3 vs SL — всё это должно быть видно на странице провайдера сигналов.







