Разработка системы уведомлений для трейдеров

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1 услугВсе 1306 услуг
Разработка системы уведомлений для трейдеров
Средняя
~3-5 рабочих дней
Часто задаваемые вопросы
Направления блокчейн-разработки
Этапы блокчейн-разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1221
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1163
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    855
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1056
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    828

Разработка системы уведомлений для трейдеров

Система уведомлений — это инфраструктура доставки своевременных сообщений трейдерам: исполнение ордера, достижение ценового уровня, ликвидация, новый листинг. Задержка в уведомлении означает пропущенную возможность или реализованный риск. Архитектура должна гарантировать доставку при любой нагрузке.

Типы уведомлений

Классификация по приоритету

P0 — Критические (мгновенно):

  • Ликвидация позиции
  • Margin call (приближение к liquidation)
  • Исполнение ордера

P1 — Важные (<5 секунд):

  • Достижение ценового алерта
  • Частичное заполнение ордера
  • Новый депозит зачислен

P2 — Информационные (<1 минута):

  • Новый листинг (анонс)
  • Еженедельный отчёт P&L
  • Реферальное начисление

Каналы доставки

Канал Latency Надёжность Лучше для
WebSocket (in-app) <100ms High (если онлайн) P0, real-time
Push (FCM/APNs) 1-5s Medium P0, P1 мобайл
Telegram Bot 1-3s High P0, P1
Email 1-60s Very High P2, отчёты
SMS 5-30s High P0 критические

Архитектура системы

Event Bus → Notification Router

from enum import Enum
from dataclasses import dataclass

class NotificationPriority(Enum):
    CRITICAL = 0
    HIGH = 1
    NORMAL = 2

@dataclass
class NotificationEvent:
    user_id: str
    event_type: str
    priority: NotificationPriority
    data: dict
    channels: list[str]  # ['websocket', 'push', 'telegram']

class NotificationRouter:
    async def route(self, event: NotificationEvent):
        # Получаем настройки пользователя
        prefs = await self.db.get_notification_prefs(event.user_id)

        # Фильтруем каналы по настройкам и приоритету
        channels = self.select_channels(event, prefs)

        tasks = []
        for channel in channels:
            handler = self.channel_handlers[channel]
            tasks.append(handler.send(event))

        # Критические уведомления — ждём подтверждения
        if event.priority == NotificationPriority.CRITICAL:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            await self.log_delivery(event, results)
        else:
            asyncio.gather(*tasks)  # fire and forget для некритичных

WebSocket доставка

class WebSocketNotificationHandler:
    def __init__(self, connection_manager):
        self.connections = connection_manager

    async def send(self, event: NotificationEvent):
        connection = self.connections.get_user_connection(event.user_id)

        if not connection:
            # Пользователь оффлайн — сохраняем для последующей доставки
            await self.store_pending(event)
            return

        try:
            await connection.send_json({
                'type': 'notification',
                'event': event.event_type,
                'data': event.data,
                'priority': event.priority.value,
                'timestamp': datetime.utcnow().isoformat()
            })
        except ConnectionClosed:
            await self.store_pending(event)

    async def deliver_pending_on_connect(self, user_id: str, connection):
        """При подключении доставляем накопленные уведомления"""
        pending = await self.db.get_pending_notifications(user_id, limit=50)
        for notif in pending:
            await connection.send_json(notif.to_dict())
        await self.db.mark_delivered(user_id, [n.id for n in pending])

Push уведомления (Firebase FCM)

import firebase_admin
from firebase_admin import messaging

class PushNotificationHandler:
    def __init__(self):
        firebase_admin.initialize_app()

    async def send(self, event: NotificationEvent):
        tokens = await self.db.get_fcm_tokens(event.user_id)
        if not tokens:
            return

        message_data = self.format_push(event)

        message = messaging.MulticastMessage(
            tokens=tokens,
            notification=messaging.Notification(
                title=message_data['title'],
                body=message_data['body']
            ),
            data={k: str(v) for k, v in event.data.items()},
            android=messaging.AndroidConfig(
                priority='high' if event.priority == NotificationPriority.CRITICAL else 'normal'
            ),
            apns=messaging.APNSConfig(
                headers={'apns-priority': '10' if event.priority.value == 0 else '5'}
            )
        )

        response = messaging.send_each_for_multicast(message)

        # Чистим невалидные токены
        for i, result in enumerate(response.responses):
            if not result.success and 'registration-token-not-registered' in str(result.exception):
                await self.db.remove_fcm_token(tokens[i])

Price Alerts

Price alert — пользователь устанавливает триггер на определённую цену:

class PriceAlertEngine:
    def __init__(self, price_feed, notification_router):
        self.price_feed = price_feed
        self.router = notification_router
        # Кэш алертов: symbol -> list[Alert] (sorted by price)
        self.alert_cache: dict[str, list] = {}

    async def check_alerts(self, symbol: str, current_price: float):
        alerts = self.alert_cache.get(symbol, [])
        triggered = []

        for alert in alerts:
            if alert.condition == 'above' and current_price >= alert.target_price:
                triggered.append(alert)
            elif alert.condition == 'below' and current_price <= alert.target_price:
                triggered.append(alert)

        for alert in triggered:
            alerts.remove(alert)
            await self.router.route(NotificationEvent(
                user_id=alert.user_id,
                event_type='price_alert',
                priority=NotificationPriority.HIGH,
                data={
                    'symbol': symbol,
                    'target_price': alert.target_price,
                    'current_price': current_price,
                    'condition': alert.condition
                },
                channels=['websocket', 'push', 'telegram']
            ))

            if alert.is_recurring:
                # Пересоздаём алерт для повторного срабатывания
                await self.add_alert(alert)

Telegram Bot интеграция

from telegram import Bot

class TelegramNotificationHandler:
    def __init__(self, bot_token: str):
        self.bot = Bot(token=bot_token)

    def format_order_fill_message(self, data: dict) -> str:
        emoji = '🟢' if data['side'] == 'buy' else '🔴'
        return (
            f"{emoji} **Ордер исполнен**\n\n"
            f"Пара: `{data['symbol']}`\n"
            f"Сторона: {'Покупка' if data['side'] == 'buy' else 'Продажа'}\n"
            f"Цена: `{data['price']} USDT`\n"
            f"Количество: `{data['quantity']}`\n"
            f"Сумма: `{data['total']} USDT`"
        )

    async def send(self, event: NotificationEvent):
        telegram_id = await self.db.get_telegram_id(event.user_id)
        if not telegram_id:
            return

        formatters = {
            'order_filled': self.format_order_fill_message,
            'liquidation': self.format_liquidation_message,
            'price_alert': self.format_price_alert_message,
        }

        formatter = formatters.get(event.event_type, self.format_generic)
        text = formatter(event.data)

        await self.bot.send_message(
            chat_id=telegram_id,
            text=text,
            parse_mode='Markdown'
        )

Настройки пользователя

Детальный контроль над уведомлениями повышает UX и снижает отписки:

interface NotificationPreferences {
  orderFilled: { enabled: boolean; channels: Channel[] };
  priceAlert: { enabled: boolean; channels: Channel[] };
  liquidationWarning: {
    enabled: boolean;
    channels: Channel[];
    thresholdPercent: number;  // предупреждать при margin ratio < X%
  };
  newListing: { enabled: boolean; minScore: number };
  dailyReport: { enabled: boolean; time: string };  // "08:00"
  quietHours: { enabled: boolean; from: string; to: string };
}

Quiet hours важны: никто не хочет уведомление в 3 ночи о новом листинге с score 5. P0 уведомления (ликвидация) игнорируют quiet hours всегда.