Настройка rate limiting и квот для AI-сервиса
Rate limiting для LLM сложнее обычного API: метрика не только запросы в минуту, но и токены (prompt + completion). Дорогой запрос с 4000 токенами должен ограничиваться строже, чем дешёвый на 100 токенов.
Многоуровневые квоты
from dataclasses import dataclass
from enum import Enum
class QuotaTier(str, Enum):
FREE = "free"
STANDARD = "standard"
ENTERPRISE = "enterprise"
@dataclass
class QuotaConfig:
requests_per_minute: int
tokens_per_minute: int # input + output tokens
tokens_per_day: int
max_tokens_per_request: int
concurrent_requests: int
QUOTA_TIERS = {
QuotaTier.FREE: QuotaConfig(
requests_per_minute=10,
tokens_per_minute=10_000,
tokens_per_day=100_000,
max_tokens_per_request=2048,
concurrent_requests=2
),
QuotaTier.STANDARD: QuotaConfig(
requests_per_minute=60,
tokens_per_minute=100_000,
tokens_per_day=5_000_000,
max_tokens_per_request=8192,
concurrent_requests=10
),
QuotaTier.ENTERPRISE: QuotaConfig(
requests_per_minute=1000,
tokens_per_minute=2_000_000,
tokens_per_day=float('inf'),
max_tokens_per_request=32768,
concurrent_requests=100
),
}
Реализация через Redis
import redis.asyncio as aioredis
import time
class TokenBucketRateLimiter:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url)
async def check_and_consume(
self,
api_key: str,
tier: QuotaTier,
input_tokens: int,
estimated_output_tokens: int
) -> tuple[bool, dict]:
config = QUOTA_TIERS[tier]
total_tokens = input_tokens + estimated_output_tokens
now = time.time()
minute_window = int(now // 60) * 60 # текущая минута
day_window = int(now // 86400) * 86400 # текущий день
pipe = self.redis.pipeline()
# Ключи для rate limiting
rpm_key = f"rl:{api_key}:rpm:{minute_window}"
tpm_key = f"rl:{api_key}:tpm:{minute_window}"
tpd_key = f"rl:{api_key}:tpd:{day_window}"
concurrent_key = f"rl:{api_key}:concurrent"
# Atomic check and increment
pipe.incr(rpm_key)
pipe.expire(rpm_key, 120)
pipe.incrby(tpm_key, total_tokens)
pipe.expire(tpm_key, 120)
pipe.incrby(tpd_key, total_tokens)
pipe.expire(tpd_key, 172800)
pipe.incr(concurrent_key)
pipe.expire(concurrent_key, 300)
results = await pipe.execute()
current_rpm, _, current_tpm, _, current_tpd, _, current_concurrent, _ = results
# Проверяем лимиты
errors = []
if current_rpm > config.requests_per_minute:
errors.append(f"Rate limit: {current_rpm}/{config.requests_per_minute} req/min")
if current_tpm > config.tokens_per_minute:
errors.append(f"Token rate limit: {current_tpm}/{config.tokens_per_minute} tokens/min")
if current_tpd > config.tokens_per_day:
errors.append(f"Daily token limit exceeded")
if current_concurrent > config.concurrent_requests:
errors.append(f"Too many concurrent requests: {current_concurrent}/{config.concurrent_requests}")
if total_tokens > config.max_tokens_per_request:
errors.append(f"Request too large: {total_tokens}/{config.max_tokens_per_request} tokens")
if errors:
# Откатываем инкременты
pipe2 = self.redis.pipeline()
pipe2.decr(rpm_key)
pipe2.decrby(tpm_key, total_tokens)
pipe2.decrby(tpd_key, total_tokens)
pipe2.decr(concurrent_key)
await pipe2.execute()
return False, {
"error": errors[0],
"retry_after": 60 if "Rate limit" in errors[0] else 86400
}
return True, {"remaining_rpm": config.requests_per_minute - current_rpm}
async def release_concurrent(self, api_key: str):
"""Вызывается по завершении запроса."""
await self.redis.decr(f"rl:{api_key}:concurrent")
FastAPI middleware
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
rate_limiter = TokenBucketRateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
if not request.url.path.startswith("/v1/chat"):
return await call_next(request)
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if not api_key:
return JSONResponse({"error": "Missing API key"}, status_code=401)
# Получаем tier из API key
tier = await get_tier_for_key(api_key)
if not tier:
return JSONResponse({"error": "Invalid API key"}, status_code=401)
# Оцениваем токены из тела запроса
body = await request.json()
input_tokens = estimate_tokens(body.get("messages", []))
max_tokens = body.get("max_tokens", 512)
allowed, info = await rate_limiter.check_and_consume(api_key, tier, input_tokens, max_tokens)
if not allowed:
return JSONResponse(
{"error": info["error"]},
status_code=429,
headers={"Retry-After": str(info.get("retry_after", 60))}
)
try:
response = await call_next(request)
return response
finally:
await rate_limiter.release_concurrent(api_key)
Nginx уровень rate limiting
Грубый rate limiting на уровне Nginx (защита от DDoS, прежде чем запрос дойдёт до Python):
limit_req_zone $http_authorization zone=api_per_key:20m rate=100r/m;
limit_conn_zone $http_authorization zone=api_conn:10m;
location /v1/ {
limit_req zone=api_per_key burst=20 nodelay;
limit_conn api_conn 20; # не более 20 concurrent на ключ
limit_req_status 429;
limit_conn_status 429;
proxy_pass http://vllm_backend;
}
Дашборд использования для клиентов
API endpoint для мониторинга своих квот:
@app.get("/v1/usage")
async def get_usage(api_key: str = Depends(get_api_key)):
return {
"tier": await get_tier_for_key(api_key),
"current_minute": await get_current_usage(api_key, "minute"),
"current_day": await get_current_usage(api_key, "day"),
"limits": QUOTA_TIERS[await get_tier_for_key(api_key)]
}







