Разработка потоковой обработки блокчейн-данных (Kafka/Flink)
Нода Ethereum в режиме real-time генерирует около 2–5 МБ данных в секунду в периоды высокой активности сети. Это события Transfer, вызовы контрактов, изменения состояния. Если ваша аналитическая система или торговый движок получают эти данные через периодический polling RPC-ноды — вы работаете с устаревшими данными и пропускаете события. Для задач, где задержка в 1–2 блока критична (арбитраж, liquidation monitoring, fraud detection), нужна потоковая архитектура с гарантиями доставки.
Источники данных: от ноды до Kafka
WebSocket подписки vs polling
Стандартный eth_subscribe("newHeads") через WebSocket даёт уведомление о новом блоке без задержки polling. Но WebSocket-соединение нестабильно на длинных периодах — нужен reconnect с catchup логикой:
func (s *NodeSubscriber) subscribeWithRecovery(ctx context.Context) error {
for {
lastBlock, _ := s.db.GetLastProcessedBlock()
// Догнать пропущенные блоки при reconnect
if err := s.catchUpFromBlock(ctx, lastBlock+1); err != nil {
return err
}
// Подписаться на новые блоки
sub, err := s.client.SubscribeNewHead(ctx, s.headers)
if err != nil {
time.Sleep(backoffDuration)
continue
}
select {
case err := <-sub.Err():
log.Warnf("subscription error: %v, reconnecting", err)
case <-ctx.Done():
return nil
}
}
}
Firehose protocol (StreamingFast/Pinax)
Для Ethereum и других EVM-сетей наиболее эффективный способ получения raw данных — Firehose, разработанный StreamingFast (теперь часть The Graph Foundation). Протокол инструментирует ноду на уровне бинарника и экспортирует блоки в бинарном formате (protobuf) с минимальной задержкой. Throughput на порядок выше чем через JSON-RPC.
# Подключение к Firehose endpoint
grpcurl -plaintext mainnet.eth.streamingfast.io:443 \
sf.ethereum.type.v2.Fetch/Block
Для проектов с требованием полной исторической прокрутки — Firehose + хранение flat files в S3/GCS позволяет воспроизводить любой диапазон блоков без повторной синхронизации ноды.
Kafka как транспортный слой
Kafka — очередь с сохранением (log-based). В отличие от RabbitMQ/Redis Streams, Kafka хранит все сообщения в настроенный retention период (дни, недели), что позволяет consumers перечитывать данные. Это критично для блокчейн-аналитики: новый consumer group может прочитать всю историю событий без обращения к ноде.
Топологии топиков для blockchain pipeline:
raw.blocks → сырые блоки (partitioned by block_number % N)
raw.transactions → все транзакции
raw.logs → все event logs
decoded.transfers → декодированные ERC-20 Transfer события
decoded.swaps → декодированные Swap события (Uniswap, Curve, etc.)
alerts.large-txns → транзакции > threshold
analytics.prices → агрегированные ценовые данные
Partitioning strategy важна: для событий конкретного контракта — partition by contractAddress (гарантирует ordering). Для транзакций — partition by from address или blockNumber.
Apache Flink: stateful stream processing
Flink — правильный инструмент для задач, которые требуют состояния: скользящие агрегаты, join потоков, обнаружение паттернов во времени. Spark Streaming — батчинг под видом стриминга (micro-batches). Flink — настоящий event-time processing.
Декодирование ABI on-the-fly
Входящие logs — сырые hex данные. Flink job должен декодировать их в типизированные события:
public class LogDecoderFunction extends RichFlatMapFunction<RawLog, DecodedEvent> {
private Map<String, ContractABI> abiRegistry;
@Override
public void flatMap(RawLog log, Collector<DecodedEvent> out) {
String contractAddress = log.getAddress().toLowerCase();
ContractABI abi = abiRegistry.get(contractAddress);
if (abi == null) return; // неизвестный контракт
String topic0 = log.getTopics().get(0);
EventDefinition eventDef = abi.findEventBySignatureHash(topic0);
if (eventDef != null) {
DecodedEvent decoded = AbiDecoder.decode(eventDef, log);
out.collect(decoded);
}
}
}
ABI реестр загружается из PostgreSQL/Redis при старте job и обновляется через Broadcast State pattern — без перезапуска job при добавлении новых контрактов.
Временные окна и агрегации
Задача: вычислять 5-минутный VWAP (Volume Weighted Average Price) по свапам Uniswap V3 в режиме реального времени.
DataStream<SwapEvent> swaps = source
.filter(e -> e.getType().equals("Swap"))
.map(e -> (SwapEvent) e);
DataStream<VWAPResult> vwap = swaps
.keyBy(SwapEvent::getPoolAddress)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new VWAPAggregator(), new VWAPWindowFunction());
Event time vs processing time — принципиальный выбор. Event time (время блока) даёт детерминированные результаты при переигрывании истории. Processing time быстрее, но даёт разные результаты при replay.
Watermarks для обработки late events — блокчейн транзакции могут приходить в Kafka с небольшой задержкой:
WatermarkStrategy.<RawLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((log, ts) -> log.getBlockTimestamp() * 1000L)
Сложные паттерны: CEP для обнаружения аномалий
Flink CEP (Complex Event Processing) позволяет описывать последовательности событий. Задача: детектировать sandwich attack — front-run транзакция, жертва, back-run транзакция в одном блоке.
Pattern<DecodedEvent, ?> sandwichPattern = Pattern
.<DecodedEvent>begin("frontrun")
.where(e -> e.isSwap() && e.getGasPrice() > threshold)
.next("victim")
.where(e -> e.isSwap() && samePool(e, "frontrun"))
.next("backrun")
.where(e -> e.isSwap() && samePool(e, "frontrun")
&& e.getSender().equals(frontrunSender(e)))
.within(Time.seconds(12)); // в пределах одного блока
State backend и отказоустойчивость
Flink checkpoint — снапшот состояния всех операторов в S3/HDFS. При сбое — восстановление с последнего checkpoint, Kafka consumer offset сохраняется атомарно с state. Это гарантирует exactly-once семантику для большинства операторов.
RocksDB state backend — обязателен для production при большом состоянии (миллионы ключей). In-memory backend не масштабируется.
# flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: s3://your-bucket/flink-checkpoints
execution.checkpointing.interval: 60000 # каждую минуту
execution.checkpointing.mode: EXACTLY_ONCE
Мониторинг и dead letter queues
Необработанные события (неизвестный ABI, ошибка парсинга, неожиданный формат) нельзя просто дропать. Dead letter queue (DLQ) в отдельный Kafka топик с сохранением оригинального сообщения и stack trace ошибки — стандартный паттерн.
Метрики Flink + Prometheus + Grafana: lag по каждому топику, throughput операторов, backpressure по графу job. Backpressure — первый индикатор, что downstream не справляется.
Типовые use cases и задержки
| Use case | Допустимая задержка | Инструмент |
|---|---|---|
| MEV bot / арбитраж | < 100мс | WebSocket → in-process |
| Liquidation monitoring | < 1 сек | Kafka + Flink CEP |
| DeFi аналитика real-time | 1–5 сек | Kafka + Flink aggregations |
| Ончейн аналитика/BI | < 1 мин | Kafka + Flink → ClickHouse |
| Исторический анализ | без ограничений | Firehose → S3 → Spark/dbt |
Инфраструктура и стек
Минимальный production кластер: 3 Kafka брокера (3 реплики для durability), Flink cluster с 1 JobManager + 3–5 TaskManager pods в Kubernetes. Хранилище результатов — ClickHouse для аналитических запросов (колоночное, быстрые aggregations на больших объёмах) или PostgreSQL + TimescaleDB для метрик временных рядов.
Управляемые сервисы сокращают операционную нагрузку: Confluent Cloud (Kafka), Amazon Kinesis (альтернатива для AWS-native стека). Для on-premise или compliance требований — собственный кластер.
Разработка первой production pipeline с декодированием событий, базовыми агрегациями и мониторингом — 4–6 недель. Сложные CEP паттерны и multi-chain поддержка добавляют ещё 3–4 недели.







