Разработка системы нормализации парсинг-данных в единый стандарт
Когда данные приходят из пяти бирж, трёх блокчейн-сетей и двух социальных платформ — каждый источник присылает их в своём формате. Binance возвращает timestamps в миллисекундах, OKX — в секундах, Telegram — в UTC datetime, on-chain данные — в Unix секундах из блока. Суммы везде разные: где-то wei, где-то Gwei, где-то string с плавающей точкой. Нормализация — это не скучная задача, это то, что делает данные использованием, а не болью.
Проблемы гетерогенных данных
Перечислим конкретные расхождения, которые встречаются в реальных проектах:
Временные метки:
- Unix milliseconds (Binance, most CEX)
- Unix seconds (Ethereum blocks, Chainlink)
- ISO 8601 strings (некоторые REST API)
- Relative ("2 hours ago") — в social data scraping
- Timezone-aware vs naive datetimes
Суммы и цены:
- Wei (10^-18 ETH) — on-chain Ethereum
- Lamports (10^-9 SOL) — on-chain Solana
- String с decimals ("1234.567890") — Binance REST
- Integer с fixed decimals (100000000 = 1 BTC у некоторых бирж)
- Float64 — потеря точности на больших числах
Идентификаторы активов:
-
BTCUSDT(Binance),BTC-USDT(OKX),BTC/USDT(ccxt standard),tBTCUST(Bitfinex) - ERC-20 address (0x2260fac...) vs ticker (WBTC)
- CoinGecko ID ("bitcoin") vs CMC ID (1)
Числовые форматы:
-
nullvs"0"vs0vs отсутствие поля — для нулевых объёмов -
-0.0— валидное значение в Python/JS float, неочевидное поведение при сравнении - NaN — иногда встречается в JSON от сторонних API
Архитектура нормализационного слоя
Система состоит из трёх частей:
Raw Data (from scrapers)
↓
[Validation Layer] — отбрасываем невалидные записи, логируем ошибки
↓
[Transformation Layer] — приводим к единому формату
↓
[Enrichment Layer] — добавляем derived поля (USD-стоимость, нормализованный тикер)
↓
Normalized Storage
Validation Layer
Перед трансформацией — явная валидация входных данных. Pydantic v2 для Python:
from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime
from typing import Optional
class RawTradeEvent(BaseModel):
"""Схема для сырых trade событий от любой биржи"""
exchange: str
raw_symbol: str
raw_price: str | float | int
raw_quantity: str | float | int
raw_timestamp: int | str | float
side: str # 'buy'/'sell' или 'BUY'/'SELL' или 1/2
raw_trade_id: str | int
@field_validator('raw_price', 'raw_quantity', mode='before')
@classmethod
def coerce_to_string(cls, v):
# Всегда конвертируем в string перед Decimal — избегаем float precision loss
if isinstance(v, float):
return f"{v:.10f}"
return str(v)
@field_validator('side', mode='before')
@classmethod
def normalize_side(cls, v):
s = str(v).lower()
if s in ('buy', 'b', '1', 'true'):
return 'buy'
if s in ('sell', 's', '2', 'false'):
return 'sell'
raise ValueError(f"Unknown side value: {v}")
Невалидные записи не падают весь pipeline — они логируются в отдельную таблицу validation_errors для разбора:
async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
valid, errors = [], []
for record in raw_records:
try:
validated = RawTradeEvent(**record)
valid.append(validated)
except ValidationError as e:
errors.append({
"raw": record,
"error": e.errors(),
"timestamp": datetime.utcnow(),
"source": record.get("exchange", "unknown"),
})
return valid, errors
Transformation Layer
Приведение к каноническому формату:
from dataclasses import dataclass
from decimal import Decimal, ROUND_DOWN
from datetime import datetime, timezone
@dataclass
class NormalizedTrade:
exchange: str
symbol: str # canonical: "BTC/USDT"
price: Decimal # всегда Decimal, никаких float
quantity: Decimal
quote_quantity: Decimal # price * quantity
side: str # 'buy' или 'sell'
timestamp: datetime # UTC timezone-aware
trade_id: str # строка, уникальна в рамках биржи
def normalize_trade(raw: RawTradeEvent) -> NormalizedTrade:
return NormalizedTrade(
exchange=raw.exchange,
symbol=normalize_symbol(raw.raw_symbol, raw.exchange),
price=parse_decimal(raw.raw_price),
quantity=parse_decimal(raw.raw_quantity),
quote_quantity=parse_decimal(raw.raw_price) * parse_decimal(raw.raw_quantity),
side=raw.side,
timestamp=normalize_timestamp(raw.raw_timestamp),
trade_id=str(raw.raw_trade_id),
)
def normalize_timestamp(raw: int | str | float) -> datetime:
"""Приводит любой timestamp к UTC datetime"""
if isinstance(raw, str):
# ISO 8601 string
dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc)
ts = float(raw)
# Автоопределение: ms vs seconds
# timestamps в ms > 1e12 (секунды в 2033 году = ~2e9)
if ts > 1e12:
ts = ts / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc)
def parse_decimal(value: str) -> Decimal:
"""Безопасная конвертация в Decimal"""
try:
d = Decimal(str(value))
if d.is_nan() or d.is_infinite():
raise ValueError(f"Non-finite decimal: {value}")
return d
except Exception as e:
raise ValueError(f"Cannot parse decimal from '{value}': {e}")
Symbol normalization
Маппинг тикеров между биржами — отдельная задача. Используем ccxt-совместимый формат BASE/QUOTE:
SYMBOL_MAPPINGS = {
"binance": {
"BTCUSDT": "BTC/USDT",
"ETHUSDT": "ETH/USDT",
"BTCBUSD": "BTC/BUSD",
# ...
},
"okx": {
"BTC-USDT": "BTC/USDT",
"BTC-USDT-SWAP": "BTC/USDT:USDT", # perpetual
},
"bybit": {
"BTCUSDT": "BTC/USDT",
"BTCPERP": "BTC/USDT:USDT",
},
}
def normalize_symbol(raw_symbol: str, exchange: str) -> str:
exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
if raw_symbol in exchange_map:
return exchange_map[raw_symbol]
# Fallback: пробуем split по общим разделителям
for sep in ['-', '_', '']:
if sep in raw_symbol or sep == '':
# Эвристика: последние 4 символа = quote (USDT, USDC, BTC)
for quote in ['USDT', 'USDC', 'BTC', 'ETH', 'BNB']:
if raw_symbol.endswith(quote):
base = raw_symbol[:-len(quote)]
return f"{base}/{quote}"
raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for exchange '{exchange}'")
On-chain специфика: decimal normalization
Для токенов с разным количеством decimals:
async def normalize_token_amount(
raw_amount: int, # wei или аналог
token_address: str,
network: str,
) -> Decimal:
decimals = await token_decimals_cache.get(token_address, network)
# Используем Decimal для точного деления
divisor = Decimal(10) ** decimals
return Decimal(raw_amount) / divisor
# Кэш decimals — они immutable, агрессивно кэшируем
class TokenDecimalsCache:
def __init__(self):
self._cache: dict[tuple, int] = {}
async def get(self, address: str, network: str) -> int:
key = (address.lower(), network)
if key not in self._cache:
decimals = await self._fetch_decimals(address, network)
self._cache[key] = decimals
return self._cache[key]
Schema Registry: версионирование форматов
Источники данных меняются. Binance обновил API — добавилось поле, изменился формат timestamp. Без версионирования схем — сломается вся нормализация.
Решение: schema registry (аналог Confluent Schema Registry для Kafka, или кастомный):
SCHEMA_VERSIONS = {
"binance_trade": {
"v1": BinanceTradeV1Schema, # до 2023-01
"v2": BinanceTradeV2Schema, # после 2023-01: добавлен quoteQty
}
}
def get_schema(source: str, version: str):
return SCHEMA_VERSIONS[source][version]
# Каждая запись хранит версию схемы источника
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}
Мониторинг качества данных
Нормализация без мониторинга — это иллюзия качества. Ключевые метрики:
-- Процент ошибок валидации по источнику за последний час
SELECT
source,
COUNT(*) FILTER (WHERE status = 'error') AS errors,
COUNT(*) AS total,
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;
Алерт при error_rate > 5% для любого источника — значит изменился формат данных и нужно обновить схему.
Cross-source consistency check: одна и та же цена BTC в одно время не должна расходиться между биржами более чем на 0.5%. Если расходится — либо ошибка нормализации, либо реальный арбитраж.
Технологический стек
| Компонент | Выбор |
|---|---|
| Валидация схем | Pydantic v2 (Python) или Zod (TypeScript) |
| Обработка числовых значений | Python decimal.Decimal, PostgreSQL numeric |
| Очередь | Redis Streams или Kafka |
| Хранение | PostgreSQL (normalized) + raw backup в S3 |
| Schema registry | Custom или Confluent Schema Registry |
| Мониторинг качества | dbt tests + Prometheus метрики |
Сырые данные всегда сохраняем в S3 до нормализации. Если обнаружена ошибка в логике нормализации — можно перепрогнать по исходным данным без повторного сбора.
Разработка нормализационного слоя для 5-7 источников — 1-2 недели. Сложность растёт нелинейно с числом источников: каждый новый источник добавляет свои edge cases.







