Парсинг данных токеномики проектов
Задача звучит просто, но дьявол в деталях: данные токеномики нет в одном месте. Vesting schedule одного проекта — в смарт-контракте, другого — в PDF whitepaper, третьего — только в Discord посте. Circulating supply часто расходится между CoinGecko, проектом и тем, что реально on-chain. Построить надёжный источник данных — это слой нормализации поверх множества heterogeneous источников.
Что входит в "токеномику" и где это лежит
| Метрика | Первичный источник | Вторичный источник |
|---|---|---|
| Total supply | ERC-20 totalSupply() on-chain |
CoinGecko API |
| Circulating supply | Расчёт (total - locked - burned) | CoinGecko (часто неточен) |
| Holder distribution | On-chain (Transfer events) | Etherscan API |
| Vesting schedule | Vesting контракт | Документация |
| Unlock events | Vesting контракт events | TokenUnlocks.app, Vestlab |
| Burn events | Transfer to 0x0...dead |
On-chain |
| Inflation/emission | Контракт (mint events) | Whitepaper |
On-chain данные: прямые вызовы
Для ERC-20 токенов базовые метрики через RPC:
from web3 import Web3
from decimal import Decimal
ERC20_ABI = [
{"name": "totalSupply", "type": "function", "inputs": [], "outputs": [{"type": "uint256"}]},
{"name": "decimals", "type": "function", "inputs": [], "outputs": [{"type": "uint8"}]},
{"name": "balanceOf", "inputs": [{"name": "account", "type": "address"}], "outputs": [{"type": "uint256"}], "type": "function"},
]
def get_token_supply_metrics(token_address: str, w3: Web3) -> dict:
contract = w3.eth.contract(address=Web3.to_checksum_address(token_address), abi=ERC20_ABI)
decimals = contract.functions.decimals().call()
total_supply = Decimal(contract.functions.totalSupply().call()) / Decimal(10 ** decimals)
# Burned tokens: баланс на dead addresses
dead_addresses = [
"0x000000000000000000000000000000000000dEaD",
"0x0000000000000000000000000000000000000000"
]
burned = sum(
Decimal(contract.functions.balanceOf(addr).call()) / Decimal(10 ** decimals)
for addr in dead_addresses
)
return {
"total_supply": float(total_supply),
"burned": float(burned),
"circulating_approx": float(total_supply - burned)
}
Индексирование Transfer событий для holder distribution
Полная история холдеров — через сканирование Transfer событий:
def get_all_holders(token_address: str, w3: Web3, from_block: int = 0) -> dict[str, Decimal]:
"""Возвращает словарь {address: balance} через replay Transfer событий"""
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
balances: dict[str, Decimal] = {}
decimals = get_decimals(token_address, w3)
# Чанкуем по 2000 блоков (ограничение большинства RPC)
current_block = w3.eth.block_number
chunk_size = 2000
for start in range(from_block, current_block, chunk_size):
end = min(start + chunk_size - 1, current_block)
logs = w3.eth.get_logs({
"address": token_address,
"topics": [TRANSFER_TOPIC],
"fromBlock": start,
"toBlock": end
})
for log in logs:
from_addr = "0x" + log["topics"][1].hex()[-40:]
to_addr = "0x" + log["topics"][2].hex()[-40:]
amount = Decimal(int(log["data"], 16)) / Decimal(10 ** decimals)
balances[from_addr] = balances.get(from_addr, Decimal(0)) - amount
balances[to_addr] = balances.get(to_addr, Decimal(0)) + amount
# Убираем нулевые балансы
return {addr: bal for addr, bal in balances.items() if bal > 0}
Для токенов с многолетней историей это займёт время и тысячи RPC запросов. Правильнее использовать The Graph subgraph если он существует для токена, или Etherscan API с кешированием.
Vesting контракты: парсинг расписания разблокировок
Большинство серьёзных проектов деплоят vesting контракты. Стандартные реализации — OpenZeppelin VestingWallet, Sablier, LlamaPay. Парсинг расписания:
# OpenZeppelin VestingWallet ABI (упрощённо)
VESTING_ABI = [
{"name": "beneficiary", "type": "function", "inputs": [], "outputs": [{"type": "address"}]},
{"name": "start", "type": "function", "inputs": [], "outputs": [{"type": "uint64"}]},
{"name": "duration", "type": "function", "inputs": [], "outputs": [{"type": "uint64"}]},
{"name": "vestedAmount", "inputs": [{"name": "token", "type": "address"}, {"name": "timestamp", "type": "uint64"}], "outputs": [{"type": "uint256"}], "type": "function"},
{"name": "released", "inputs": [{"name": "token", "type": "address"}], "outputs": [{"type": "uint256"}], "type": "function"},
]
def parse_vesting_contract(vesting_address: str, token_address: str, w3: Web3) -> dict:
contract = w3.eth.contract(address=Web3.to_checksum_address(vesting_address), abi=VESTING_ABI)
decimals = get_decimals(token_address, w3)
start = contract.functions.start().call()
duration = contract.functions.duration().call()
end = start + duration
released = Decimal(contract.functions.released(token_address).call()) / Decimal(10 ** decimals)
# Строим unlock schedule: сколько разблокируется в каждый месяц
schedule = []
step = 30 * 24 * 3600 # 30 дней
for ts in range(start, end + step, step):
vested = Decimal(contract.functions.vestedAmount(token_address, ts).call()) / Decimal(10 ** decimals)
schedule.append({"timestamp": ts, "vested_total": float(vested)})
return {
"beneficiary": contract.functions.beneficiary().call(),
"start": start,
"end": end,
"released": float(released),
"schedule": schedule
}
Для Sablier (stream-based vesting) и LlamaPay API иные — нужно читать stream параметры из их контрактов.
CoinGecko API для агрегированных данных
Для market cap, volume, price history — CoinGecko Pro API:
import httpx
from datetime import datetime
COINGECKO_BASE = "https://pro-api.coingecko.com/api/v3"
async def get_token_market_data(coingecko_id: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{COINGECKO_BASE}/coins/{coingecko_id}",
headers={"x-cg-pro-api-key": CG_API_KEY},
params={"localization": "false", "tickers": "false", "community_data": "false"}
)
data = resp.json()
mdata = data["market_data"]
return {
"price_usd": mdata["current_price"]["usd"],
"market_cap_usd": mdata["market_cap"]["usd"],
"fully_diluted_valuation": mdata["fully_diluted_valuation"]["usd"],
"total_supply": mdata["total_supply"],
"circulating_supply": mdata["circulating_supply"],
"max_supply": mdata["max_supply"],
"volume_24h": mdata["total_volume"]["usd"],
"price_change_24h_pct": mdata["price_change_percentage_24h"],
}
Важно: CoinGecko circulating supply часто неточен — проекты сами репортируют его и не всегда корректно. Для критичных расчётов — верифицировать on-chain.
Нормализация и хранение
Данные из разных источников нужно приводить к единой модели:
CREATE TABLE token_snapshots (
id BIGSERIAL PRIMARY KEY,
token_address TEXT NOT NULL,
chain_id INTEGER NOT NULL,
snapshot_time TIMESTAMPTZ NOT NULL,
total_supply NUMERIC,
circulating_supply NUMERIC,
burned_supply NUMERIC,
locked_supply NUMERIC,
price_usd NUMERIC,
market_cap_usd NUMERIC,
holder_count INTEGER,
source TEXT NOT NULL -- 'on-chain', 'coingecko', 'computed'
);
CREATE TABLE unlock_events (
id BIGSERIAL PRIMARY KEY,
token_address TEXT NOT NULL,
vesting_contract TEXT,
beneficiary TEXT,
unlock_time TIMESTAMPTZ NOT NULL,
amount NUMERIC NOT NULL,
label TEXT -- 'team', 'investors', 'ecosystem'
);
Полная система мониторинга токеномики для 50–100 токенов с ежедневными снапшотами и алертами на крупные разблокировки: 3–5 недель разработки.







