Граббинг данных из мемпула

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Граббинг данных из мемпула
Сложный
~3-5 дней
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Парсинг данных из мемпула

Мемпул — это очередь неподтверждённых транзакций, видимая любому узлу сети. Это исходное сырьё для MEV-ботов, систем мониторинга рисков, front-running detection, арбитражных стратегий и аналитики рынка. Разница между "смотреть на подтверждённые транзакции" и "смотреть на мемпул" — это разница между пост-фактум анализом и возможностью действовать до включения в блок.

Что такое мемпул с технической точки зрения

В Ethereum каждый full node ведёт собственный txpool — in-memory структуру неподтверждённых транзакций. eth_getTransactionByHash на pending транзакцию возвращает данные до включения в блок. txpool_content (Geth-специфичный endpoint) возвращает весь мемпул узла в один запрос.

Мемпул не глобальный — разные ноды видят разные подмножества транзакций в зависимости от P2P топологии. Транзакция распространяется через gossip протокол, но не мгновенно. Если транзакция отправлена через один RPC, другой RPC может её не видеть в течение нескольких секунд.

Для MEV-чувствительных приложений важно понимать: существует private mempool (Flashbots, MEV Blocker, блочные builder'ы) — транзакции идут напрямую к builder минуя публичный мемпул. Такие транзакции не видны никаким мемпул-мониторингом.

Методы подключения к мемпулу

eth_subscribe("pendingTransactions")

Самый простой метод — WebSocket подписка:

import { createPublicClient, webSocket } from 'viem';

const client = createPublicClient({
  transport: webSocket('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'),
});

// Только хеши транзакций
const unwatch = client.watchPendingTransactions({
  onTransactions: async (hashes) => {
    for (const hash of hashes) {
      // Для каждого хеша нужен отдельный запрос за деталями
      const tx = await client.getTransaction({ hash });
      await processPendingTx(tx);
    }
  },
});

Проблема: при высоком трафике (Ethereum mainnet — 50-200 pending транзакций в секунду) — не успеваем запрашивать детали каждой транзакции, накапливается очередь.

eth_subscribe("newPendingTransactions") с full tx body

Некоторые ноды поддерживают расширенный режим с полным телом транзакции:

import asyncio
import json
import websockets

async def subscribe_mempool_full():
    async with websockets.connect("wss://your-private-node:8546") as ws:
        # Стандартная Geth подписка с includeTransactions=true
        await ws.send(json.dumps({
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_subscribe",
            "params": ["newPendingTransactions", True]  # True = include full tx
        }))
        
        ack = json.loads(await ws.recv())
        subscription_id = ack["result"]
        
        async for raw in ws:
            msg = json.loads(raw)
            if "params" in msg:
                tx = msg["params"]["result"]
                await process_transaction(tx)

Не все провайдеры поддерживают True флаг. Alchemy и Infura — да. QuickNode — да. Публичный Infura без ключа — нет.

txpool_content и txpool_inspect

Для снимка всего мемпула в один момент времени:

import httpx
import asyncio

async def snapshot_mempool(rpc_url: str) -> dict:
    """Полный снимок мемпула — только для собственной ноды!"""
    async with httpx.AsyncClient() as client:
        resp = await client.post(rpc_url, json={
            "jsonrpc": "2.0",
            "method": "txpool_content",
            "params": [],
            "id": 1
        })
    
    data = resp.json()["result"]
    
    pending_txs = []
    for sender, nonce_map in data["pending"].items():
        for nonce, tx in nonce_map.items():
            pending_txs.append({
                **tx,
                "status": "pending",
                "from": sender,
            })
    
    queued_txs = []
    for sender, nonce_map in data["queued"].items():
        for nonce, tx in nonce_map.items():
            queued_txs.append({**tx, "status": "queued", "from": sender})
    
    return {"pending": pending_txs, "queued": queued_txs}

txpool_content может возвращать десятки тысяч транзакций — это тяжёлый запрос, не делайте его чаще раза в секунду.

Декодирование calldata

Raw calldata — это ABI-encoded вызов функции. Первые 4 байта (function selector) = keccak256 первых 4 байт сигнатуры функции. Остальное — ABI-encoded аргументы.

from eth_abi import decode
from eth_utils import function_abi_to_4byte_selector
import json

# Загрузка ABI Uniswap V2 Router
with open('uniswap_v2_router_abi.json') as f:
    abi = json.load(f)

# Строим маппинг selector → function
selector_to_func = {}
for func in abi:
    if func['type'] == 'function':
        selector = function_abi_to_4byte_selector(func).hex()
        selector_to_func[selector] = func

def decode_calldata(calldata: str) -> dict | None:
    if len(calldata) < 10:
        return None
    
    selector = calldata[2:10]  # убираем '0x'
    func = selector_to_func.get(selector)
    if not func:
        return None
    
    input_types = [inp['type'] for inp in func['inputs']]
    decoded = decode(input_types, bytes.fromhex(calldata[10:]))
    
    return {
        'function': func['name'],
        'args': dict(zip([i['name'] for i in func['inputs']], decoded))
    }

# Пример: декодирование Uniswap V2 swapExactTokensForTokens
tx_data = "0x38ed1739000000000000000000000000000000000000000000000000..."
result = decode_calldata(tx_data)
# {'function': 'swapExactTokensForTokens', 'args': {'amountIn': 1000000, ...}}

Для неизвестных selectors — база данных 4byte.directory содержит 1M+ сигнатур:

async def lookup_selector(selector: str) -> str | None:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}")
        results = resp.json().get("results", [])
        return results[0]["text_signature"] if results else None

Архитектура высокопроизводительного мемпул-монитора

При серьёзной нагрузке (full Ethereum mempool = 50-300 tx/sec) узкое место — это не сеть, а обработка. Архитектура:

[Multiple P2P Nodes] → [Raw Tx Stream]
                              ↓
                    [Decoder Worker Pool]  ← N процессов
                         /    |    \
              [Kafka Topic: decoded_txs]
               /         |         \
    [MEV Detector]  [Volume Monitor]  [Alert Engine]
                              ↓
                    [TimescaleDB / ClickHouse]

Kafka (или Redis Streams) как шина данных критична: декодирование и хранение должны быть асинхронны от получения. Если БД тормозит — не теряем входящие данные.

Собственная нода обязательна для серьёзного мемпул-мониторинга. Публичные RPC (Alchemy, Infura) дросселируют pending subscriptions и не дают txpool_content. Для Ethereum: Geth или Reth нода с 32GB RAM + NVMe SSD.

P2P подключение (devp2p)

Самый низкоуровневый и полный доступ — подключение к Ethereum P2P сети напрямую как non-mining peer:

# libp2p / devp2p библиотеки для Python: py-evm, pydevp2p
# Более практично использовать готовые решения:
# - Blocknative Mempool Explorer API
# - TxStreet
# - EigenPhi (для MEV анализа)
# Или Rust: reth с custom hooks для мемпул событий

На уровне P2P видны транзакции до того как они достигли RPC провайдеров. Для MEV-ботов первых 100мс — это конкурентное преимущество.

Специфика других сетей

Solana. Нет мемпула в традиционном смысле. Транзакции идут напрямую к лидеру-валидатору через QUIC. Публичного мемпул API нет. Мониторинг через getSignaturesForAddress с polling — это post-confirmation. Для pre-confirmation нужно подключаться как gRPC клиент к validator через Jito MEV infra или аналоги.

Bitcoin. getrawmempool RPC возвращает список txid всех pending транзакций. getmempoolentry — детали конкретной. ZMQ subscription (zmqpubrawtx) — события о новых транзакциях в реальном времени:

import zmq
import asyncio

context = zmq.asyncio.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:28333")
sock.subscribe(b"rawtx")

async def monitor_bitcoin_mempool():
    while True:
        _, raw_tx = await sock.recv_multipart()
        tx = decode_bitcoin_tx(raw_tx)
        await process_bitcoin_pending_tx(tx)

TON. Шардированная архитектура усложняет мемпул-мониторинг — транзакции обрабатываются параллельно в разных шардах. TonCenter API предоставляет endpoint для pending транзакций конкретного аккаунта.

Детектирование MEV паттернов

Практическое применение мемпул-данных — детектирование sandwich атак и arbitrage:

def detect_sandwich_setup(pending_txs: list[dict]) -> list[dict]:
    """Ищем потенциальные sandwich атаки: крупный своп в мемпуле"""
    candidates = []
    
    for tx in pending_txs:
        decoded = decode_calldata(tx['input'])
        if not decoded:
            continue
        
        # Uniswap V2 / V3 свопы
        if decoded['function'] in ['swapExactTokensForTokens', 'exactInputSingle']:
            amount_in_usd = get_usd_value(decoded['args'])
            
            if amount_in_usd > SANDWICH_THRESHOLD_USD:  # например $50K+
                candidates.append({
                    'tx_hash': tx['hash'],
                    'from': tx['from'],
                    'function': decoded['function'],
                    'amount_usd': amount_in_usd,
                    'gas_price': int(tx.get('maxFeePerGas', tx.get('gasPrice', 0)), 16),
                })
    
    return candidates

Хранение и retention policy

Мемпул генерирует огромные объёмы данных. На Ethereum mainnet — несколько GB в сутки при полном логировании. Рекомендованная политика:

Данные Retention Хранилище
Confirmed tx метаданные Бессрочно PostgreSQL
Pending tx (до confirmation) 24 часа Redis + periodic flush
Full calldata pending 72 часа ClickHouse (колоночное)
Confirmed OHLCV агрегаты Бессрочно TimescaleDB
Отброшенные транзакции (dropped) 7 дней PostgreSQL

Транзакции, которые в итоге не попали в блок (dropped, replaced by higher gas) — отдельная категория. Их хранение полезно для анализа gas wars и замены транзакций.

Мониторинг системы

Ключевые метрики мемпул-монитора:

  • Mempool lag — задержка между получением tx и записью в БД. Алерт при > 500мс
  • Decoder throughput — транзакций/сек. Должен покрывать входящий поток
  • Dropped messages — потери в Kafka/Redis Streams очереди
  • Node connectivity — статус WebSocket соединения к каждой ноде
  • Pending tx count — аномальный рост (>200K pending) = congestion event