Настройка Rate Limiting и квотирования доступа к AI-сервису

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Настройка Rate Limiting и квотирования доступа к AI-сервису
Средняя
от 1 рабочего дня до 3 рабочих дней
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Настройка rate limiting и квот для AI-сервиса

Rate limiting для LLM сложнее обычного API: метрика не только запросы в минуту, но и токены (prompt + completion). Дорогой запрос с 4000 токенами должен ограничиваться строже, чем дешёвый на 100 токенов.

Многоуровневые квоты

from dataclasses import dataclass
from enum import Enum

class QuotaTier(str, Enum):
    FREE = "free"
    STANDARD = "standard"
    ENTERPRISE = "enterprise"

@dataclass
class QuotaConfig:
    requests_per_minute: int
    tokens_per_minute: int         # input + output tokens
    tokens_per_day: int
    max_tokens_per_request: int
    concurrent_requests: int

QUOTA_TIERS = {
    QuotaTier.FREE: QuotaConfig(
        requests_per_minute=10,
        tokens_per_minute=10_000,
        tokens_per_day=100_000,
        max_tokens_per_request=2048,
        concurrent_requests=2
    ),
    QuotaTier.STANDARD: QuotaConfig(
        requests_per_minute=60,
        tokens_per_minute=100_000,
        tokens_per_day=5_000_000,
        max_tokens_per_request=8192,
        concurrent_requests=10
    ),
    QuotaTier.ENTERPRISE: QuotaConfig(
        requests_per_minute=1000,
        tokens_per_minute=2_000_000,
        tokens_per_day=float('inf'),
        max_tokens_per_request=32768,
        concurrent_requests=100
    ),
}

Реализация через Redis

import redis.asyncio as aioredis
import time

class TokenBucketRateLimiter:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = aioredis.from_url(redis_url)

    async def check_and_consume(
        self,
        api_key: str,
        tier: QuotaTier,
        input_tokens: int,
        estimated_output_tokens: int
    ) -> tuple[bool, dict]:

        config = QUOTA_TIERS[tier]
        total_tokens = input_tokens + estimated_output_tokens
        now = time.time()
        minute_window = int(now // 60) * 60  # текущая минута
        day_window = int(now // 86400) * 86400  # текущий день

        pipe = self.redis.pipeline()

        # Ключи для rate limiting
        rpm_key = f"rl:{api_key}:rpm:{minute_window}"
        tpm_key = f"rl:{api_key}:tpm:{minute_window}"
        tpd_key = f"rl:{api_key}:tpd:{day_window}"
        concurrent_key = f"rl:{api_key}:concurrent"

        # Atomic check and increment
        pipe.incr(rpm_key)
        pipe.expire(rpm_key, 120)
        pipe.incrby(tpm_key, total_tokens)
        pipe.expire(tpm_key, 120)
        pipe.incrby(tpd_key, total_tokens)
        pipe.expire(tpd_key, 172800)
        pipe.incr(concurrent_key)
        pipe.expire(concurrent_key, 300)

        results = await pipe.execute()
        current_rpm, _, current_tpm, _, current_tpd, _, current_concurrent, _ = results

        # Проверяем лимиты
        errors = []
        if current_rpm > config.requests_per_minute:
            errors.append(f"Rate limit: {current_rpm}/{config.requests_per_minute} req/min")
        if current_tpm > config.tokens_per_minute:
            errors.append(f"Token rate limit: {current_tpm}/{config.tokens_per_minute} tokens/min")
        if current_tpd > config.tokens_per_day:
            errors.append(f"Daily token limit exceeded")
        if current_concurrent > config.concurrent_requests:
            errors.append(f"Too many concurrent requests: {current_concurrent}/{config.concurrent_requests}")
        if total_tokens > config.max_tokens_per_request:
            errors.append(f"Request too large: {total_tokens}/{config.max_tokens_per_request} tokens")

        if errors:
            # Откатываем инкременты
            pipe2 = self.redis.pipeline()
            pipe2.decr(rpm_key)
            pipe2.decrby(tpm_key, total_tokens)
            pipe2.decrby(tpd_key, total_tokens)
            pipe2.decr(concurrent_key)
            await pipe2.execute()

            return False, {
                "error": errors[0],
                "retry_after": 60 if "Rate limit" in errors[0] else 86400
            }

        return True, {"remaining_rpm": config.requests_per_minute - current_rpm}

    async def release_concurrent(self, api_key: str):
        """Вызывается по завершении запроса."""
        await self.redis.decr(f"rl:{api_key}:concurrent")

FastAPI middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()
rate_limiter = TokenBucketRateLimiter()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    if not request.url.path.startswith("/v1/chat"):
        return await call_next(request)

    api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
    if not api_key:
        return JSONResponse({"error": "Missing API key"}, status_code=401)

    # Получаем tier из API key
    tier = await get_tier_for_key(api_key)
    if not tier:
        return JSONResponse({"error": "Invalid API key"}, status_code=401)

    # Оцениваем токены из тела запроса
    body = await request.json()
    input_tokens = estimate_tokens(body.get("messages", []))
    max_tokens = body.get("max_tokens", 512)

    allowed, info = await rate_limiter.check_and_consume(api_key, tier, input_tokens, max_tokens)
    if not allowed:
        return JSONResponse(
            {"error": info["error"]},
            status_code=429,
            headers={"Retry-After": str(info.get("retry_after", 60))}
        )

    try:
        response = await call_next(request)
        return response
    finally:
        await rate_limiter.release_concurrent(api_key)

Nginx уровень rate limiting

Грубый rate limiting на уровне Nginx (защита от DDoS, прежде чем запрос дойдёт до Python):

limit_req_zone $http_authorization zone=api_per_key:20m rate=100r/m;
limit_conn_zone $http_authorization zone=api_conn:10m;

location /v1/ {
    limit_req zone=api_per_key burst=20 nodelay;
    limit_conn api_conn 20;           # не более 20 concurrent на ключ

    limit_req_status 429;
    limit_conn_status 429;

    proxy_pass http://vllm_backend;
}

Дашборд использования для клиентов

API endpoint для мониторинга своих квот:

@app.get("/v1/usage")
async def get_usage(api_key: str = Depends(get_api_key)):
    return {
        "tier": await get_tier_for_key(api_key),
        "current_minute": await get_current_usage(api_key, "minute"),
        "current_day": await get_current_usage(api_key, "day"),
        "limits": QUOTA_TIERS[await get_tier_for_key(api_key)]
    }