Защита API от скрейпинга и обнаружение ботов
Скрейпинг API — систематический сбор данных с частотой выше нормального человеческого взаимодействия. Без защиты конкурент может выгрузить весь каталог товаров за несколько часов, автоматически подбирать пароли или парсить контактную базу. Задача — отличить бота от человека без ущерба для легитимных пользователей.
Слои защиты
Клиент → WAF (IP репутация) → Rate Limiting → Bot Detection → API Logic
↓
Fingerprint + Behavioral Analysis + CAPTCHA
Каждый слой отсеивает часть трафика. Идеальная защита — сочетание нескольких методов, ни один из которых в одиночку не идеален.
Сигналы ботов и их вес
| Сигнал | Вес | Описание |
|---|---|---|
User-Agent отсутствует / curl / python-requests |
+40 | Типичные автоматические клиенты |
Нет Accept-Language / Accept-Encoding |
+20 | Браузер всегда шлёт эти заголовки |
| Запросы строго каждые N мс | +35 | Человек не может так точно |
| Одинаковый паттерн URL (последовательный обход) | +30 | ?page=1, ?page=2, ?page=3... |
| Нет Referer при навигации | +15 | Браузер обычно передаёт |
| Множество запросов с одного IP-диапазона | +25 | Распределённый бот |
| TLS fingerprint (JA3) нетипичный | +30 | Node.js/Python TLS отличается от браузера |
Детектор на основе поведенческих признаков
import time
import statistics
from collections import defaultdict, deque
class BotDetector:
def __init__(self, redis_client):
self.r = redis_client
self.window = 300 # 5-минутное окно анализа
def analyze_request(self, request) -> dict:
"""Возвращает score (0-100) и причины подозрения"""
score = 0
reasons = []
# 1. Заголовки браузера
headers = request.headers
ua = headers.get('User-Agent', '')
bot_uas = ['python-requests', 'curl', 'wget', 'Go-http-client',
'Java/', 'okhttp', 'axios', 'node-fetch']
for bot_ua in bot_uas:
if bot_ua.lower() in ua.lower():
score += 40
reasons.append(f'bot_useragent:{bot_ua}')
break
if not ua:
score += 40
reasons.append('no_useragent')
if not headers.get('Accept-Language'):
score += 20
reasons.append('no_accept_language')
if not headers.get('Accept-Encoding'):
score += 15
reasons.append('no_accept_encoding')
# 2. Анализ частоты запросов
ip = request.remote_addr
timing_score = self._analyze_timing(ip)
if timing_score > 0:
score += timing_score
reasons.append(f'suspicious_timing:{timing_score}')
# 3. Паттерн URL (последовательный обход)
path = request.path
pattern_score = self._analyze_url_pattern(ip, path)
if pattern_score > 0:
score += pattern_score
reasons.append(f'url_pattern:{pattern_score}')
# 4. JA3 TLS fingerprint (через nginx переменную)
ja3 = headers.get('X-JA3-Fingerprint')
if ja3 and self._is_suspicious_ja3(ja3):
score += 30
reasons.append(f'suspicious_ja3:{ja3[:16]}')
return {
'score': min(score, 100),
'is_bot': score >= 60,
'reasons': reasons,
'action': self._get_action(score)
}
def _analyze_timing(self, ip: str) -> int:
"""Анализ интервалов между запросами"""
key = f"timing:{ip}"
now = time.time()
# Сохранить временную метку
self.r.lpush(key, now)
self.r.ltrim(key, 0, 49) # последние 50 запросов
self.r.expire(key, self.window)
timestamps = [float(t) for t in self.r.lrange(key, 0, -1)]
if len(timestamps) < 5:
return 0
# Вычислить интервалы между запросами
timestamps.sort()
intervals = [timestamps[i+1] - timestamps[i]
for i in range(len(timestamps)-1)]
if not intervals:
return 0
avg = statistics.mean(intervals)
stdev = statistics.stdev(intervals) if len(intervals) > 1 else 0
# Коэффициент вариации < 0.1 означает машинную точность
cv = stdev / avg if avg > 0 else 0
if cv < 0.05 and avg < 2.0: # очень регулярные, быстрые запросы
return 35
if cv < 0.15 and avg < 1.0: # регулярные, очень быстрые
return 25
return 0
def _analyze_url_pattern(self, ip: str, path: str) -> int:
"""Обнаружение последовательного обхода"""
key = f"paths:{ip}"
self.r.lpush(key, path)
self.r.ltrim(key, 0, 19)
self.r.expire(key, self.window)
paths = self.r.lrange(key, 0, -1)
if len(paths) < 5:
return 0
# Паттерн /items/1, /items/2, /items/3...
import re
numeric_pattern = re.compile(r'/(\d+)$')
numbers = [int(m.group(1)) for p in paths
if (m := numeric_pattern.search(p.decode()))]
if len(numbers) >= 5:
# Проверить монотонность
diffs = [numbers[i] - numbers[i+1] for i in range(len(numbers)-1)]
if all(d == diffs[0] for d in diffs) and abs(diffs[0]) in [1, -1]:
return 30
return 0
def _is_suspicious_ja3(self, ja3: str) -> bool:
"""Список известных JA3 для автоматических клиентов"""
# Реальный список ведётся отдельно и обновляется
SUSPICIOUS_JA3 = {
'e7d705a3286e19ea42f587b344ee6865', # Python requests
'b386946a5a44d1ddcc843bc75336dfce', # Scrapy
'6734f37431670b3ab4292b8f60f29984', # Go default
}
return ja3.lower() in SUSPICIOUS_JA3
def _get_action(self, score: int) -> str:
if score < 30: return 'allow'
if score < 60: return 'challenge' # CAPTCHA или задержка
if score < 80: return 'throttle' # жёсткий rate limit
return 'block'
Middleware интеграция
from flask import request, jsonify, g
import time
bot_detector = BotDetector(redis_client)
@app.before_request
def bot_detection_middleware():
# Не проверять статические файлы
if request.path.startswith('/static/'):
return
result = bot_detector.analyze_request(request)
g.bot_score = result['score']
if result['action'] == 'block':
# Логировать и блокировать
log_bot_attempt(request, result)
return jsonify({'error': 'Access denied'}), 403
if result['action'] == 'throttle':
# Принудительная задержка (тарпит)
time.sleep(2)
# + применить жёсткий rate limit
if result['action'] == 'challenge':
# Для браузерных клиентов — редирект на CAPTCHA
if 'application/json' not in request.headers.get('Accept', ''):
return redirect(f'/challenge?return={request.url}')
JA3 сбор через Nginx
# nginx.conf
load_module modules/ngx_http_ssl_module.so;
server {
ssl_preread on;
# Передать JA3 fingerprint в заголовке
set $ja3_fingerprint $ssl_ja3; # требует модуль nginx-ja3
proxy_set_header X-JA3-Fingerprint $ja3_fingerprint;
proxy_pass http://backend;
}
Honeypot-поля и ловушки
# Скрытые API-endpoints для ботов
@app.route('/api/items/all') # Не существует для пользователей
def honeypot_endpoint():
ip = request.remote_addr
# Бот нашёл этот endpoint через сканирование — блокировать
bot_detector.blacklist_ip(ip, duration=86400, reason='honeypot')
# Вернуть правдоподобный, но пустой ответ
return jsonify({'items': [], 'total': 0})
# HTML honeypot: скрытая ссылка в HTML
# <a href="/hidden-page" style="display:none" aria-hidden="true">hidden</a>
# Браузер не перейдёт, бот — перейдёт
CAPTCHA-интеграция при подозрении
@app.route('/api/search')
@require_score_below(60) # блокировать score >= 60
def search():
q = request.args.get('q', '')
results = search_service.query(q)
return jsonify(results)
def require_score_below(max_score):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if g.get('bot_score', 0) >= max_score:
# Потребовать Cloudflare Turnstile токен
token = request.headers.get('CF-Turnstile-Token')
if not verify_turnstile(token):
return jsonify({
'error': 'CAPTCHA required',
'captcha_site_key': TURNSTILE_SITE_KEY
}), 429
return f(*args, **kwargs)
return wrapper
return decorator
Мониторинг и метрики
# Prometheus метрики
bot_requests_total = Counter(
'bot_requests_total',
'Bot detection results',
['action', 'reason']
)
@app.after_request
def track_bot_metrics(response):
if hasattr(g, 'bot_result'):
for reason in g.bot_result['reasons']:
bot_requests_total.labels(
action=g.bot_result['action'],
reason=reason.split(':')[0]
).inc()
return response
Срок выполнения
Реализация многоуровневой bot-detection системы с поведенческим анализом, JA3 fingerprinting и интеграцией CAPTCHA — 3–5 рабочих дней.







