Разработка кастомизированного блокчейн-индексатора
The Graph решает 80% задач индексации. Оставшиеся 20% — это когда нужна сложная агрегация данных на лету, кросс-чейн индексация с объединением состояния, доступ к данным которые не попали в события (storage slots, trace calls), субсекундная латентность, или полный контроль над инфраструктурой без vendor lock-in. Именно для этих случаев строится кастомный индексатор.
Архитектурные решения перед началом
Прежде чем писать код, нужно ответить на три вопроса:
Источник данных. Logs/events — самый дешёвый способ, но только то что контракт явно эмитирует. Traces (internal transactions) — нужен archive node с trace_ namespace или Erigon с --tracing. Storage proofs — для состояния которое никогда не эмитировалось в события. Выбор источника определяет требования к ноде и сложность парсинга.
Модель консистентности. Нужна ли точная консистентность (обработка реорганизаций) или eventual достаточно? Для финансовых данных реорганизация — не редкость. На Ethereum mainnet реорги глубиной 1-2 блока случаются несколько раз в день. Глубина финальности для safe confirmation — 12-15 блоков на PoS Ethereum.
Требования к латентности. Реалтайм (< 1 сек от блока) — нужен WebSocket + streaming. Аналитика (минуты/часы) — batch processing достаточен.
Компоненты системы
Ингестия данных (Data Ingestion Layer)
Три паттерна получения данных из ноды:
JSON-RPC polling — простейший вариант. eth_getLogs с фильтром по address и topics, eth_getBlockByNumber. Задержка = polling interval (обычно 500мс-2сек). Проблема: при высоком RPS нода начинает throttle.
WebSocket subscriptions — eth_subscribe("newHeads") и eth_subscribe("logs", filter). Latency близка к времени блока. Проблема: при реконнекте можно пропустить блоки, нужна логика catch-up.
Direct P2P — подключение к Ethereum P2P сети через devp2p/libp2p, получение блоков напрямую без RPC. Минимальная латентность, но высокая сложность реализации. Практично только если индексатор физически близко к валидаторам.
Для production рекомендую WebSocket + catch-up механизм:
async def subscribe_blocks(ws_url: str, from_block: int):
# Сначала догоняем до текущего блока
current = await rpc.eth_block_number()
for block_num in range(from_block, current):
block = await rpc.eth_get_block_by_number(block_num)
await process_block(block)
# Затем подписываемся на новые
async with websockets.connect(ws_url) as ws:
await ws.send(json.dumps({
"method": "eth_subscribe",
"params": ["newHeads"]
}))
async for message in ws:
head = json.loads(message)
await process_new_head(head)
Декодирование ABI
Raw log — это массив topics (bytes32) и data (bytes). Декодирование через ABI:
import { decodeEventLog, parseAbi } from 'viem';
const abi = parseAbi([
'event Transfer(address indexed from, address indexed to, uint256 value)'
]);
const decoded = decodeEventLog({
abi,
data: log.data,
topics: log.topics,
});
// decoded.args.from, decoded.args.to, decoded.args.value
Indexed параметры кодируются в topics (topic[0] = keccak256 сигнатуры, topic[1..3] = indexed args). Non-indexed — в data через ABI encoding.
Проблемы при декодировании:
- Anonymous events — нет topic[0], matching только по address
-
Proxy контракты — ABI implementation, а не proxy. Нужно резолвить через
implementation()slot (EIP-1967: slot0x360894...) - Upgrade события — после апгрейда ABI меняется, нужна версионированность
Обработка реорганизаций
Это самая неприятная часть кастомного индексатора. Реорг означает что блоки которые вы уже обработали — более не каноничные.
Паттерн: каждая запись в базе содержит block_hash и block_number. При обработке нового блока проверяем родителя:
-- Обнаружение реорга
SELECT block_hash, block_number
FROM processed_blocks
WHERE block_number = $1 AND block_hash != $2;
-- Если есть строки — это реорг
При обнаружении реорга:
- Найти точку расхождения (общий предок)
- Откатить все записи от точки расхождения до текущего блока (
DELETE WHERE block_number >= fork_point) - Переобработать канонические блоки
Для этого нужна атомарная обработка блока — все изменения от одного блока применяются в одной транзакции БД с block_hash как идентификатором.
BEGIN;
DELETE FROM events WHERE block_hash = $orphaned_hash;
DELETE FROM processed_blocks WHERE block_hash = $orphaned_hash;
-- вставка канонических данных
INSERT INTO processed_blocks (block_number, block_hash, ...) VALUES (...);
INSERT INTO events (...) VALUES (...);
COMMIT;
Слой хранения
Выбор БД определяется паттернами запросов:
| Сценарий | Технология | Причина |
|---|---|---|
| Time-series данные (цены, объёмы) | TimescaleDB | Гипертаблицы, автокомпрессия, continuous aggregates |
| Граф-запросы (связи между аккаунтами) | PostgreSQL + ltree или Neo4j | Рекурсивные запросы или граф-нативная БД |
| Полнотекстовый поиск по метаданным NFT | PostgreSQL + GIN index | jsonb + GIN индексы для JSON-полей |
| OLAP аналитика | ClickHouse | Колоночное хранение, векторизованное выполнение |
| Кэш актуального состояния | Redis | Hash-структуры для балансов, pub/sub для стриминга |
Для большинства DeFi индексаторов: PostgreSQL для основных данных + Redis для hot cache.
API слой
GraphQL через Hasura (автогенерация из PostgreSQL схемы) или вручную через Apollo Server. REST для простых случаев.
Критичная оптимизация: DataLoader для батчинга запросов. Если GraphQL-запрос просит transfers { from { balance } } — без DataLoader получим N+1 запросов к БД. DataLoader группирует запросы за один tick event loop.
Subscriptions для реалтайм данных: PostgreSQL LISTEN/NOTIFY → WebSocket → GraphQL subscription.
Производительность и масштабирование
Узкие места в порядке частоты встречаемости:
Параллельная обработка блоков. Блоки независимы если нет cross-block состояния (обычно нет). Worker pool по N потоков, каждый обрабатывает свой диапазон блоков. Осторожно: порядок записи в БД должен быть детерминированным.
Batch insert. Не INSERT каждое событие отдельно. PostgreSQL COPY или INSERT ... VALUES (…),(…),(…) — разница в 10-50x по throughput.
# Плохо: N отдельных INSERT
for event in events:
await db.execute("INSERT INTO events VALUES ($1, $2, ...)", event)
# Хорошо: один batch INSERT
await db.executemany(
"INSERT INTO events VALUES ($1, $2, ...)",
[(e.block, e.tx_hash, ...) for e in events]
)
Индексы vs insert speed. Каждый индекс замедляет INSERT. Для исторической синхронизации: создать таблицу без индексов, загрузить данные, затем CREATE INDEX CONCURRENTLY. Ускорение в 3-10x по сравнению с индексами во время загрузки.
Технологический стек
| Компонент | Варианты |
|---|---|
| Язык ингестии | TypeScript/Node.js (экосистема viem/ethers), Python (web3.py), Rust (ethers-rs/alloy) |
| Очередь | Redis Streams, Apache Kafka (при > 10k событий/сек) |
| База данных | PostgreSQL 16 + TimescaleDB |
| API | Hasura (быстрый старт) или custom GraphQL |
| Мониторинг | Prometheus + Grafana, alerting по lag метрике |
| Деплой | Docker Compose (dev), Kubernetes (prod) |
Rust (alloy crate) дает наилучшую производительность для высоконагруженных индексаторов: парсинг ABI, десериализация блоков, работа с bytes — все это быстрее чем в Node.js в 5-20x.
Мониторинг и операционка
Ключевые метрики:
-
Indexer lag — разница между
latest_blockв БД иeth_blockNumber. Алерт при > 10 блоков. - Reorg count — количество реорганизаций за период. Резкий рост = проблемы с нодой или RPC.
- Events per block — аномалии указывают на нестандартную активность или баги в парсинге.
- DB write latency — деградация означает нужен vacuum, bloat, или шардирование.
# Пример Prometheus alert
- alert: IndexerLagHigh
expr: eth_latest_block - indexer_processed_block > 50
for: 2m
annotations:
summary: "Indexer is falling behind by {{ $value }} blocks"
Процесс работы
Проектирование (3-5 дней). Определение источников данных, схемы БД, требований к реалтайму. Выбор между кастомной разработкой и расширением существующих решений (Ponder, Substreams).
Разработка ядра (5-10 дней). Ингестия + декодирование + обработка реорганизаций + хранение. Это критический путь, тестируется на исторических данных.
API и интеграции (3-5 дней). GraphQL/REST схема, subscriptions, документация.
Нагрузочное тестирование и оптимизация (2-3 дня). Синхронизация с genesis, нагрузочное тестирование API, настройка пулов соединений, индексов.
Деплой и мониторинг (1-2 дня). Docker Compose / Kubernetes, настройка алертов, runbook для дежурных.
Итого: 1-2 недели для индексатора одного протокола на одной сети. Мультичейн с агрегацией — ближе к 3-4 неделям.







