Интеллектуальное rate limiting для API
Базовый rate limit по IP — это X запросов в минуту без разбора. Интеллектуальный rate limiting учитывает идентификатор пользователя, тип endpoint, историческое поведение и автоматически адаптирует лимиты. Правильно настроенный rate limiting не мешает легитимным пользователям, но надёжно блокирует скрейперов и DDoS.
Алгоритмы и их применение
Token Bucket — классика для API. Каждый пользователь имеет ведро токенов, пополняемое с фиксированной скоростью. Позволяет краткосрочные всплески.
Sliding Window — точнее Fixed Window. Считает запросы за последние N секунд относительно текущего момента, не допуская удвоения лимита на границе окна.
Adaptive Rate Limiting — лимиты меняются динамически на основе нагрузки сервера или оценки риска клиента.
Redis-реализация Sliding Window
import redis
import time
from functools import wraps
r = redis.Redis(host='localhost', decode_responses=True)
def sliding_window_rate_limit(key: str, limit: int, window: int) -> bool:
"""
key: уникальный идентификатор (user_id, ip, api_key)
limit: макс. запросов за window секунд
window: размер окна в секундах
Возвращает True если запрос разрешён
"""
now = time.time()
window_start = now - window
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # удалить старые записи
pipe.zadd(key, {str(now): now}) # добавить текущий запрос
pipe.zcard(key) # подсчитать в окне
pipe.expire(key, window) # TTL для cleanup
results = pipe.execute()
count = results[2]
return count <= limit
def rate_limit(limit=100, window=60, key_func=None):
"""Декоратор для Flask/FastAPI"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if key_func:
key = f"rl:{key_func()}"
else:
key = f"rl:{request.remote_addr}"
if not sliding_window_rate_limit(key, limit, window):
# Вернуть Retry-After
return jsonify({'error': 'Too Many Requests'}), 429, {
'Retry-After': str(window),
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': '0'
}
return f(*args, **kwargs)
return wrapper
return decorator
Многоуровневые лимиты по endpoint
# Разные лимиты для разных операций
RATE_LIMITS = {
'default': {'limit': 1000, 'window': 3600}, # 1000/час
'auth.login': {'limit': 10, 'window': 900}, # 10 попыток за 15 мин
'auth.register': {'limit': 5, 'window': 3600}, # 5/час
'api.search': {'limit': 100, 'window': 60}, # 100/мин
'api.export': {'limit': 10, 'window': 3600}, # 10 экспортов/час
'api.upload': {'limit': 50, 'window': 3600}, # 50 загрузок/час
'webhooks.send': {'limit': 500, 'window': 60}, # 500/мин
}
class MultiLevelRateLimiter:
def check(self, user_id: int, endpoint: str, ip: str) -> dict:
config = RATE_LIMITS.get(endpoint, RATE_LIMITS['default'])
# Уровень 1: по пользователю (аутентифицированные)
if user_id:
user_key = f"rl:user:{user_id}:{endpoint}"
if not sliding_window_rate_limit(user_key, config['limit'], config['window']):
return {'allowed': False, 'reason': 'user_limit'}
# Уровень 2: по IP (защита от создания множества аккаунтов)
ip_key = f"rl:ip:{ip}:{endpoint}"
ip_limit = config['limit'] * 3 # IP-лимит выше user-лимита
if not sliding_window_rate_limit(ip_key, ip_limit, config['window']):
return {'allowed': False, 'reason': 'ip_limit'}
# Уровень 3: глобальный (защита от DDoS)
global_key = f"rl:global:{endpoint}"
global_limit = config['limit'] * 100
if not sliding_window_rate_limit(global_key, global_limit, config['window']):
return {'allowed': False, 'reason': 'global_limit'}
return {'allowed': True}
Адаптивный rate limit по риску
class AdaptiveRateLimiter:
def get_risk_score(self, request) -> float:
"""Оценить риск запроса от 0.0 (низкий) до 1.0 (высокий)"""
score = 0.0
# Подозрительный User-Agent
ua = request.headers.get('User-Agent', '')
if not ua or 'python-requests' in ua.lower() or 'curl' in ua.lower():
score += 0.3
# Нет заголовков браузера
if not request.headers.get('Accept-Language'):
score += 0.2
# Недавняя история ошибок (много 404, 401)
error_count = r.get(f"errors:{request.remote_addr}") or 0
if int(error_count) > 10:
score += 0.3
# Запросы с Tor/VPN IP (проверка по списку)
if self.is_known_proxy(request.remote_addr):
score += 0.2
return min(score, 1.0)
def get_effective_limit(self, base_limit: int, risk_score: float) -> int:
"""Снизить лимит для подозрительных клиентов"""
multiplier = 1.0 - (risk_score * 0.8) # до 80% снижения
return max(int(base_limit * multiplier), 1)
Заголовки ответа
RFC 6585 и стандарты API требуют информативных заголовков:
def add_rate_limit_headers(response, key, limit, window):
now = time.time()
current_count = r.zcard(key) or 0
remaining = max(0, limit - current_count)
# Время сброса = начало следующего окна
reset_at = int(now) + window - (int(now) % window)
response.headers['X-RateLimit-Limit'] = str(limit)
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Reset'] = str(reset_at)
response.headers['X-RateLimit-Policy'] = f"{limit};w={window}"
return response
Kong rate-limiting-advanced plugin
# Kong declarative config
plugins:
- name: rate-limiting-advanced
config:
limit:
- 100
- 1000
window_size:
- 60 # 100 req/мин
- 3600 # 1000 req/час
identifier: consumer # или ip, credential, header
strategy: redis
redis:
host: redis
port: 6379
namespace: kong_rl
sync_rate: 2 # синхронизировать с Redis каждые 2 сек
hide_client_headers: false
error_message: "API rate limit exceeded"
error_code: 429
Срок выполнения
Реализация многоуровневого rate limiting с Redis Sliding Window и адаптивными лимитами — 1–2 рабочих дня.







