Реализация очереди задач парсинга (Redis/RabbitMQ/BullMQ)
Парсинг в цикле — наивное решение, которое ломается при первой же ошибке сети. Очередь задач решает три проблемы сразу: изоляция сбоев, повторные попытки и горизонтальное масштабирование.
Выбор брокера
| Брокер | Плюсы | Когда использовать |
|---|---|---|
| BullMQ (Redis) | Простая настройка, UI из коробки, приоритеты | Node.js-стек, до ~100k задач/сутки |
| Celery (Redis/RabbitMQ) | Python-экосистема, цепочки задач (chains, chords) | Python-стек, сложные пайплайны |
| RabbitMQ | Надёжная доставка, routing keys, dead-letter | Высоконагруженные системы, multiple consumers |
| Sidekiq (Redis) | Ruby, минимум конфигурации | Rails-приложения |
Для большинства веб-проектов BullMQ или Celery — оптимальный выбор. RabbitMQ оправдан, когда нужны гарантии доставки на уровне AMQP и сложная маршрутизация между сервисами.
BullMQ: базовая настройка
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null });
// Создание очереди
export const scrapeQueue = new Queue('scraping', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 60_000 },
removeOnComplete: { count: 500 },
removeOnFail: { count: 200 },
},
});
// Добавление задачи
await scrapeQueue.add('scrape-url', {
url: 'https://example.com/catalog?page=5',
siteId: 42,
depth: 1,
}, { priority: 1 });
// Воркер
const worker = new Worker('scraping', async (job: Job) => {
const { url, siteId } = job.data;
const html = await fetchWithProxy(url);
const products = parseProducts(html);
await saveProducts(products, siteId);
return { count: products.length };
}, { connection, concurrency: 5 });
worker.on('failed', (job, err) => {
logger.error(`Job ${job?.id} failed: ${err.message}`);
});
Celery: пайплайн с цепочками задач
Celery позволяет строить цепочки: сначала спарсить список, потом для каждого элемента запустить детальный парсинг.
from celery import Celery, chain, chord
import redis
app = Celery('scraper', broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')
app.conf.task_routes = {
'scraper.tasks.fetch_listing': {'queue': 'listings'},
'scraper.tasks.fetch_product': {'queue': 'products'},
}
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_listing(self, url: str, site_id: int) -> list[str]:
try:
html = fetch_page(url)
return extract_product_urls(html)
except (NetworkError, RateLimitError) as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
@app.task(bind=True, max_retries=3)
def fetch_product(self, url: str, site_id: int) -> dict:
try:
html = fetch_page(url)
return parse_product(html)
except Exception as exc:
raise self.retry(exc=exc)
@app.task
def save_products(products: list[dict], site_id: int):
bulk_upsert(products, site_id)
# Запуск пайплайна
def start_site_crawl(site_id: int, catalog_url: str):
urls = fetch_listing.delay(catalog_url, site_id).get()
chord(
fetch_product.s(url, site_id) for url in urls
)(save_products.s(site_id))
Dead Letter Queue
Задачи, исчерпавшие все попытки, попадают в DLQ. Это не просто мусорная корзина — это очередь для ручного анализа и переработки.
# RabbitMQ: настройка DLQ через аргументы очереди
channel.queue_declare(
queue='scraping.products',
durable=True,
arguments={
'x-dead-letter-exchange': 'scraping.dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 3600000, # 1 час
}
)
channel.exchange_declare(exchange='scraping.dlx', exchange_type='direct')
channel.queue_declare(queue='scraping.failed', durable=True)
channel.queue_bind(queue='scraping.failed', exchange='scraping.dlx', routing_key='failed')
Задачи из DLQ можно переотправить в основную очередь после устранения причины сбоя — через Admin UI или скрипт.
Мониторинг очереди
BullMQ Board (UI для BullMQ) или Flower (для Celery) дают визуальное представление о состоянии очередей. Ключевые метрики, которые стоит отслеживать:
- Глубина очереди (waiting jobs)
- Скорость обработки (jobs/sec)
- Процент ошибок по типам задач
- Время выполнения (p50, p95, p99)
Эти метрики экспортируются в Prometheus через /metrics эндпоинт и визуализируются в Grafana.
Сроки реализации
Очередь на BullMQ или Celery с повторными попытками и базовым мониторингом — 3–4 рабочих дня. Интеграция с RabbitMQ, DLQ, Prometheus-метрики, UI для управления задачами — ещё 2–3 дня.







