Разработка системы алертов для трейдеров (цена, объём, ликвидация)
Система алертов — это уши трейдера на рынке. Без неё нужно постоянно смотреть на экран; с ней — можно жить нормально и реагировать только на значимые события. Хорошая система алертов покрывает три категории: ценовые события, объёмные аномалии и ликвидационные данные.
Типы алертов
Ценовые алерты:
- Цена достигла X (above/below)
- Цена изменилась на N% за период
- Цена пересекла скользящую среднюю
- Новый ATH / ATL за период
Объёмные алерты:
- Объём за свечу превысил N × средний
- Крупная сделка (whale) > $X в одной транзакции
- Резкий рост open interest
Ликвидационные алерты:
- Крупная ликвидация (> $1M за 1 минуту)
- Кумулятивные ликвидации за период
- Liquidation heatmap — цены с большим количеством ожидаемых ликвидаций
Архитектура
class AlertRule(BaseModel):
id: str
user_id: str
type: str # 'price_above', 'price_below', 'volume_spike', 'liquidation'
symbol: str
exchange: str
# Параметры в зависимости от типа
price_threshold: Optional[Decimal]
volume_multiplier: Optional[float] # N × avg volume
liquidation_usd: Optional[float]
# Доставка
channels: list[str] # ['telegram', 'email', 'push', 'webhook']
webhook_url: Optional[str]
# Поведение
one_time: bool = True # деактивировать после срабатывания
cooldown_minutes: int = 60 # минимум между повторными срабатываниями
last_triggered: Optional[datetime] = None
is_active: bool = True
Alert Engine
class AlertEngine:
def __init__(self, rule_repo, notifier):
self.rules = {} # symbol → list[AlertRule]
self.rule_repo = rule_repo
self.notifier = notifier
async def on_ticker_update(self, ticker: NormalizedTicker):
rules = self.rules.get(f"{ticker.exchange}:{ticker.symbol}", [])
for rule in rules:
if not rule.is_active:
continue
if self.is_in_cooldown(rule):
continue
if await self.evaluate_rule(rule, ticker):
await self.trigger_alert(rule, ticker)
async def evaluate_rule(self, rule: AlertRule, ticker: NormalizedTicker) -> bool:
if rule.type == 'price_above':
return ticker.last >= rule.price_threshold
elif rule.type == 'price_below':
return ticker.last <= rule.price_threshold
elif rule.type == 'price_change_pct':
change = await self.compute_price_change(rule.symbol, rule.period_minutes)
return abs(change) >= rule.change_pct_threshold
return False
async def trigger_alert(self, rule: AlertRule, ticker: NormalizedTicker):
message = self.format_alert_message(rule, ticker)
# Доставляем по всем каналам
for channel in rule.channels:
await self.notifier.send(channel, rule.user_id, message)
# Обновляем состояние правила
rule.last_triggered = datetime.utcnow()
if rule.one_time:
rule.is_active = False
await self.rule_repo.save(rule)
def is_in_cooldown(self, rule: AlertRule) -> bool:
if not rule.last_triggered:
return False
elapsed = (datetime.utcnow() - rule.last_triggered).total_seconds() / 60
return elapsed < rule.cooldown_minutes
Объёмные алерты
class VolumeAnomalyDetector:
WINDOW_PERIODS = 20 # свечей для расчёта среднего
async def check_volume_spike(self, symbol: str, current_volume: Decimal) -> float:
"""Возвращает множитель относительно среднего объёма"""
recent_volumes = await self.candle_repo.get_recent_volumes(
symbol, count=self.WINDOW_PERIODS
)
if len(recent_volumes) < 5:
return 1.0
avg_volume = sum(recent_volumes) / len(recent_volumes)
if avg_volume == 0:
return 1.0
return float(current_volume / avg_volume)
Ликвидационные алерты
Данные о ликвидациях получаем с бирж (Binance forceOrder stream, Bybit liquidation) или агрегаторов (Coinalyze, CoinGlass API):
class LiquidationMonitor:
async def monitor_binance_liquidations(self):
async with websockets.connect("wss://fstream.binance.com/ws/!forceOrder@arr") as ws:
async for message in ws:
data = json.loads(message)
order = data["o"]
liquidation = Liquidation(
symbol=order["s"],
side=order["S"],
quantity=Decimal(order["q"]),
price=Decimal(order["p"]),
usd_value=Decimal(order["q"]) * Decimal(order["p"]),
timestamp=data["T"],
)
await self.process_liquidation(liquidation)
async def process_liquidation(self, liq: Liquidation):
# Обновляем rolling 1-minute total
await self.redis.incrbyfloat(
f"liq_total:{liq.symbol}:1m",
float(liq.usd_value)
)
await self.redis.expire(f"liq_total:{liq.symbol}:1m", 60)
# Проверяем алерты по крупным разовым ликвидациям
if liq.usd_value >= 1_000_000: # > $1M
await self.alert_engine.fire_liquidation_alert(liq)
Webhook доставка
class WebhookDelivery:
async def send(self, webhook_url: str, alert: AlertMessage):
payload = {
"type": alert.type,
"symbol": alert.symbol,
"message": alert.text,
"timestamp": alert.timestamp.isoformat(),
"data": alert.raw_data,
}
# Подписываем payload для верификации на стороне получателя
signature = hmac.new(
alert.rule.webhook_secret.encode(),
json.dumps(payload).encode(),
hashlib.sha256
).hexdigest()
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json=payload,
headers={"X-Alert-Signature": f"sha256={signature}"},
timeout=10.0,
)
Webhook-алерты позволяют интегрировать систему с внешними ботами, торговыми системами, CRM. По webhook можно триггерить автоматические действия — например, открытие ордера при ценовом алерте.







