Разработка системы уведомлений для трейдеров
Система уведомлений — это инфраструктура доставки своевременных сообщений трейдерам: исполнение ордера, достижение ценового уровня, ликвидация, новый листинг. Задержка в уведомлении означает пропущенную возможность или реализованный риск. Архитектура должна гарантировать доставку при любой нагрузке.
Типы уведомлений
Классификация по приоритету
P0 — Критические (мгновенно):
- Ликвидация позиции
- Margin call (приближение к liquidation)
- Исполнение ордера
P1 — Важные (<5 секунд):
- Достижение ценового алерта
- Частичное заполнение ордера
- Новый депозит зачислен
P2 — Информационные (<1 минута):
- Новый листинг (анонс)
- Еженедельный отчёт P&L
- Реферальное начисление
Каналы доставки
| Канал | Latency | Надёжность | Лучше для |
|---|---|---|---|
| WebSocket (in-app) | <100ms | High (если онлайн) | P0, real-time |
| Push (FCM/APNs) | 1-5s | Medium | P0, P1 мобайл |
| Telegram Bot | 1-3s | High | P0, P1 |
| 1-60s | Very High | P2, отчёты | |
| SMS | 5-30s | High | P0 критические |
Архитектура системы
Event Bus → Notification Router
from enum import Enum
from dataclasses import dataclass
class NotificationPriority(Enum):
CRITICAL = 0
HIGH = 1
NORMAL = 2
@dataclass
class NotificationEvent:
user_id: str
event_type: str
priority: NotificationPriority
data: dict
channels: list[str] # ['websocket', 'push', 'telegram']
class NotificationRouter:
async def route(self, event: NotificationEvent):
# Получаем настройки пользователя
prefs = await self.db.get_notification_prefs(event.user_id)
# Фильтруем каналы по настройкам и приоритету
channels = self.select_channels(event, prefs)
tasks = []
for channel in channels:
handler = self.channel_handlers[channel]
tasks.append(handler.send(event))
# Критические уведомления — ждём подтверждения
if event.priority == NotificationPriority.CRITICAL:
results = await asyncio.gather(*tasks, return_exceptions=True)
await self.log_delivery(event, results)
else:
asyncio.gather(*tasks) # fire and forget для некритичных
WebSocket доставка
class WebSocketNotificationHandler:
def __init__(self, connection_manager):
self.connections = connection_manager
async def send(self, event: NotificationEvent):
connection = self.connections.get_user_connection(event.user_id)
if not connection:
# Пользователь оффлайн — сохраняем для последующей доставки
await self.store_pending(event)
return
try:
await connection.send_json({
'type': 'notification',
'event': event.event_type,
'data': event.data,
'priority': event.priority.value,
'timestamp': datetime.utcnow().isoformat()
})
except ConnectionClosed:
await self.store_pending(event)
async def deliver_pending_on_connect(self, user_id: str, connection):
"""При подключении доставляем накопленные уведомления"""
pending = await self.db.get_pending_notifications(user_id, limit=50)
for notif in pending:
await connection.send_json(notif.to_dict())
await self.db.mark_delivered(user_id, [n.id for n in pending])
Push уведомления (Firebase FCM)
import firebase_admin
from firebase_admin import messaging
class PushNotificationHandler:
def __init__(self):
firebase_admin.initialize_app()
async def send(self, event: NotificationEvent):
tokens = await self.db.get_fcm_tokens(event.user_id)
if not tokens:
return
message_data = self.format_push(event)
message = messaging.MulticastMessage(
tokens=tokens,
notification=messaging.Notification(
title=message_data['title'],
body=message_data['body']
),
data={k: str(v) for k, v in event.data.items()},
android=messaging.AndroidConfig(
priority='high' if event.priority == NotificationPriority.CRITICAL else 'normal'
),
apns=messaging.APNSConfig(
headers={'apns-priority': '10' if event.priority.value == 0 else '5'}
)
)
response = messaging.send_each_for_multicast(message)
# Чистим невалидные токены
for i, result in enumerate(response.responses):
if not result.success and 'registration-token-not-registered' in str(result.exception):
await self.db.remove_fcm_token(tokens[i])
Price Alerts
Price alert — пользователь устанавливает триггер на определённую цену:
class PriceAlertEngine:
def __init__(self, price_feed, notification_router):
self.price_feed = price_feed
self.router = notification_router
# Кэш алертов: symbol -> list[Alert] (sorted by price)
self.alert_cache: dict[str, list] = {}
async def check_alerts(self, symbol: str, current_price: float):
alerts = self.alert_cache.get(symbol, [])
triggered = []
for alert in alerts:
if alert.condition == 'above' and current_price >= alert.target_price:
triggered.append(alert)
elif alert.condition == 'below' and current_price <= alert.target_price:
triggered.append(alert)
for alert in triggered:
alerts.remove(alert)
await self.router.route(NotificationEvent(
user_id=alert.user_id,
event_type='price_alert',
priority=NotificationPriority.HIGH,
data={
'symbol': symbol,
'target_price': alert.target_price,
'current_price': current_price,
'condition': alert.condition
},
channels=['websocket', 'push', 'telegram']
))
if alert.is_recurring:
# Пересоздаём алерт для повторного срабатывания
await self.add_alert(alert)
Telegram Bot интеграция
from telegram import Bot
class TelegramNotificationHandler:
def __init__(self, bot_token: str):
self.bot = Bot(token=bot_token)
def format_order_fill_message(self, data: dict) -> str:
emoji = '🟢' if data['side'] == 'buy' else '🔴'
return (
f"{emoji} **Ордер исполнен**\n\n"
f"Пара: `{data['symbol']}`\n"
f"Сторона: {'Покупка' if data['side'] == 'buy' else 'Продажа'}\n"
f"Цена: `{data['price']} USDT`\n"
f"Количество: `{data['quantity']}`\n"
f"Сумма: `{data['total']} USDT`"
)
async def send(self, event: NotificationEvent):
telegram_id = await self.db.get_telegram_id(event.user_id)
if not telegram_id:
return
formatters = {
'order_filled': self.format_order_fill_message,
'liquidation': self.format_liquidation_message,
'price_alert': self.format_price_alert_message,
}
formatter = formatters.get(event.event_type, self.format_generic)
text = formatter(event.data)
await self.bot.send_message(
chat_id=telegram_id,
text=text,
parse_mode='Markdown'
)
Настройки пользователя
Детальный контроль над уведомлениями повышает UX и снижает отписки:
interface NotificationPreferences {
orderFilled: { enabled: boolean; channels: Channel[] };
priceAlert: { enabled: boolean; channels: Channel[] };
liquidationWarning: {
enabled: boolean;
channels: Channel[];
thresholdPercent: number; // предупреждать при margin ratio < X%
};
newListing: { enabled: boolean; minScore: number };
dailyReport: { enabled: boolean; time: string }; // "08:00"
quietHours: { enabled: boolean; from: string; to: string };
}
Quiet hours важны: никто не хочет уведомление в 3 ночи о новом листинге с score 5. P0 уведомления (ликвидация) игнорируют quiet hours всегда.







