Реализация распределённого парсинга (несколько воркеров) для масштабирования
Один воркер упирается в скорость сети, ограничения целевого сайта и пропускную способность прокси. Распределённая система позволяет горизонтально масштабироваться: 5 воркеров дают не просто ×5 скорость — они работают с разными прокси, разными IP, параллельно обходят разные разделы сайта и не мешают друг другу при записи результатов.
Общая архитектура
Координатор (Scheduler)
│
▼
Очередь задач (Redis + BullMQ / Celery / Sidekiq)
┌────┴────┐
│ │
Worker-1 Worker-2 ... Worker-N
(proxy-1) (proxy-2) (proxy-N)
│ │
└────┬────┘
▼
Shared Storage (PostgreSQL + S3)
│
▼
Дедупликатор / Merger
Координатор не парсит сам — он только генерирует задачи и следит за прогрессом. Воркеры stateless: любой может взять любую задачу.
Структура задачи
{
"task_id": "uuid-v4",
"job_type": "scrape_product_list",
"url": "https://target.com/catalog?page=42",
"site_id": 7,
"depth": 2,
"retry_count": 0,
"max_retries": 3,
"proxy_pool": "residential",
"created_at": "2025-03-01T12:00:00Z"
}
Координатор: генерация задач
Для сайта с пагинацией координатор сначала определяет количество страниц, затем выдаёт задачу на каждую:
class CatalogScheduler:
def __init__(self, queue: Queue, site_config: dict):
self.queue = queue
self.config = site_config
async def schedule_full_crawl(self):
total_pages = await self.detect_total_pages(self.config['catalog_url'])
tasks = [
ScrapeTask(
url=self.config['catalog_url'] + f"?page={i}",
job_type="scrape_listing",
site_id=self.config['id'],
)
for i in range(1, total_pages + 1)
]
await self.queue.bulk_add(tasks, priority=5)
logger.info(f"Scheduled {len(tasks)} tasks for site {self.config['id']}")
Задачи на страницы листинга порождают вторичные задачи — на отдельные карточки товаров. Это двухуровневая очередь с разными приоритетами.
Управление прокси
Каждый воркер привязан к пулу прокси. Ротация — по round-robin с «карантином» забаненных IP:
class ProxyRotator:
def __init__(self, proxies: list[str]):
self.proxies = proxies
self.banned: dict[str, datetime] = {}
self.idx = 0
def get_proxy(self) -> str:
for _ in range(len(self.proxies)):
proxy = self.proxies[self.idx % len(self.proxies)]
self.idx += 1
ban_until = self.banned.get(proxy)
if ban_until and ban_until > datetime.utcnow():
continue
return proxy
raise NoProxyAvailable("All proxies are in cooldown")
def report_banned(self, proxy: str, cooldown_minutes: int = 30):
self.banned[proxy] = datetime.utcnow() + timedelta(minutes=cooldown_minutes)
Резидентные прокси ротируются реже, датацентровые — чаще (обнаруживаются быстрее).
Предотвращение дублирования работы
Перед добавлением URL в очередь координатор проверяет Bloom filter или Redis Set:
async def should_schedule(self, url: str) -> bool:
normalized = normalize_url(url) # убрать UTM, trailing slash, etc.
key = f"visited:{self.site_id}"
return not await self.redis.sismember(key, normalized)
async def mark_visited(self, url: str):
normalized = normalize_url(url)
await self.redis.sadd(f"visited:{self.site_id}", normalized)
await self.redis.expire(f"visited:{self.site_id}", 86400 * 7)
Bloom filter предпочтительнее для масштаба > 10M URL: памяти нужно в 50–100 раз меньше, чем у Set, при допустимой погрешности < 0.1%.
Согласованная запись результатов
Несколько воркеров пишут в одну БД одновременно. Конфликты обрабатываются через INSERT ... ON CONFLICT DO UPDATE:
INSERT INTO scraped_products (site_id, external_id, url, data, scraped_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (site_id, external_id)
DO UPDATE SET
data = EXCLUDED.data,
scraped_at = EXCLUDED.scraped_at,
updated_at = NOW()
WHERE scraped_products.scraped_at < EXCLUDED.scraped_at - INTERVAL '1 hour';
Условие WHERE scraped_at < EXCLUDED.scraped_at - '1 hour' не даёт двум воркерам, попавшим на один товар, бесконечно перезаписывать друг друга.
Масштабирование воркеров
Воркеры запускаются в Docker-контейнерах. Горизонтальное масштабирование через docker-compose scale или Kubernetes HPA:
# docker-compose.yml
services:
scraper-worker:
image: scraper:latest
environment:
- REDIS_URL=redis://redis:6379
- DB_URL=postgresql://...
- PROXY_LIST=/run/secrets/proxies
deploy:
replicas: 5
restart: unless-stopped
При добавлении новых воркеров очередь автоматически распределяет нагрузку — никакой ручной конфигурации.
Мониторинг прогресса
Координатор ведёт счётчики в Redis:
scrape_job:{job_id}:total → 2400 (всего задач)
scrape_job:{job_id}:done → 1837 (выполнено)
scrape_job:{job_id}:failed → 12 (ошибки)
scrape_job:{job_id}:started_at → timestamp
Расчётное время окончания = (total - done) / (done / elapsed_seconds).
Дашборд (BullMQ Board или собственный UI) показывает активные задачи, скорость обработки, распределение по воркерам и статистику ошибок.
Типичные конфигурации и производительность
| Воркеров | Прокси | Скорость | Подходит для |
|---|---|---|---|
| 3 | 10 датацентровых | ~500 стр/мин | Каталоги до 100к товаров |
| 10 | 50 резидентных | ~1500 стр/мин | Крупные маркетплейсы |
| 20+ | 100+ резидентных | ~5000 стр/мин | Ежедневный полный обход |
Сроки реализации
Базовая распределённая система с 2–3 воркерами, Redis-очередью и PostgreSQL-хранилищем — 8–10 рабочих дней. Динамическая ротация прокси, Bloom filter для дедупликации, автомасштабирование через Docker/K8s, дашборд мониторинга — ещё 5–7 дней.







