Реализация Fallback между LLM-провайдерами при недоступности

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Реализация Fallback между LLM-провайдерами при недоступности
Средняя
~2-3 рабочих дня
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Реализация Fallback между LLM-провайдерами при недоступности

LLM-провайдеры периодически испытывают перебои: rate limits, maintenance windows, региональные сбои. Продакшн-система должна автоматически переключаться на резервный провайдер. Graded fallback — это не просто "если сломано, попробуй другое", а умная стратегия с учётом типа ошибки, времени ожидания и деградации качества.

Базовый Fallback с tenacity

from openai import OpenAI, RateLimitError, APIError
from anthropic import Anthropic
from groq import Groq
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
from dataclasses import dataclass
from typing import Optional
import time

logger = logging.getLogger(__name__)

@dataclass
class ProviderConfig:
    name: str
    model: str
    priority: int  # Меньше = выше приоритет
    max_retries: int = 3

class LLMFallbackClient:

    PROVIDERS = [
        ProviderConfig("anthropic", "claude-sonnet-4-5", priority=1),
        ProviderConfig("openai", "gpt-4o", priority=2),
        ProviderConfig("groq", "llama-3.1-70b-versatile", priority=3),
    ]

    def __init__(self):
        self.clients = {
            "anthropic": Anthropic(),
            "openai": OpenAI(),
            "groq": Groq(),
        }
        self._circuit_breakers: dict[str, dict] = {}

    def _is_circuit_open(self, provider: str) -> bool:
        """Circuit breaker: блокируем провайдер при частых ошибках"""
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        if cb["failures"] >= 5:
            # Переоткрываем через 60 секунд
            if time.time() - cb["last_failure"] > 60:
                self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}
                return False
            return True
        return False

    def _record_failure(self, provider: str):
        cb = self._circuit_breakers.get(provider, {"failures": 0, "last_failure": 0})
        cb["failures"] += 1
        cb["last_failure"] = time.time()
        self._circuit_breakers[provider] = cb

    def _record_success(self, provider: str):
        self._circuit_breakers[provider] = {"failures": 0, "last_failure": 0}

    def _call_provider(self, provider: str, model: str, messages: list[dict], **kwargs) -> str:
        """Вызов конкретного провайдера"""
        if provider == "anthropic":
            response = self.clients["anthropic"].messages.create(
                model=model,
                max_tokens=kwargs.get("max_tokens", 2048),
                messages=messages,
                system=kwargs.get("system", ""),
            )
            return response.content[0].text

        elif provider == "openai":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["openai"].chat.completions.create(
                model=model,
                messages=all_messages,
                max_tokens=kwargs.get("max_tokens", 2048),
                temperature=kwargs.get("temperature", 0.1),
            )
            return response.choices[0].message.content

        elif provider == "groq":
            all_messages = []
            if kwargs.get("system"):
                all_messages.append({"role": "system", "content": kwargs["system"]})
            all_messages.extend(messages)
            response = self.clients["groq"].chat.completions.create(
                model=model,
                messages=all_messages,
            )
            return response.choices[0].message.content

        raise ValueError(f"Unknown provider: {provider}")

    def complete(self, messages: list[dict], **kwargs) -> tuple[str, str]:
        """Выполняет запрос с автоматическим fallback.
        Возвращает (ответ, имя_провайдера)"""

        sorted_providers = sorted(self.PROVIDERS, key=lambda p: p.priority)

        last_error = None
        for config in sorted_providers:
            if self._is_circuit_open(config.name):
                logger.warning(f"Circuit open for {config.name}, skipping")
                continue

            for attempt in range(config.max_retries):
                try:
                    result = self._call_provider(config.name, config.model, messages, **kwargs)
                    self._record_success(config.name)

                    if config.priority > 1:
                        logger.warning(f"Used fallback provider: {config.name}")

                    return result, config.name

                except (RateLimitError, anthropic.RateLimitError) as e:
                    wait_time = min(2 ** attempt, 30)
                    logger.warning(f"{config.name} rate limited, waiting {wait_time}s")
                    time.sleep(wait_time)
                    last_error = e

                except (APIError, anthropic.APIError) as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} API error: {e}")
                    last_error = e
                    break  # Переходим к следующему провайдеру

                except Exception as e:
                    self._record_failure(config.name)
                    logger.error(f"{config.name} unexpected error: {e}")
                    last_error = e
                    break

        raise RuntimeError(f"All providers failed. Last error: {last_error}")

Async fallback с параллельными попытками

import asyncio

class AsyncFallbackClient:
    """Для критически важных запросов — параллельный запрос к нескольким провайдерам"""

    async def complete_parallel(self, messages: list[dict], **kwargs) -> str:
        """Отправляет запрос параллельно в 2 провайдера, берёт первый ответ"""

        tasks = [
            self._async_anthropic(messages, **kwargs),
            self._async_openai(messages, **kwargs),
        ]

        # race — возвращаем первый успешный результат
        done, pending = await asyncio.wait(
            [asyncio.create_task(t) for t in tasks],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Отменяем оставшиеся запросы
        for task in pending:
            task.cancel()

        return await next(iter(done))

Мониторинг здоровья провайдеров

class ProviderHealthMonitor:
    """Периодически проверяет доступность провайдеров"""

    async def check_all(self) -> dict[str, bool]:
        results = {}
        test_message = [{"role": "user", "content": "Say 'ok'"}]

        for provider_name in ["anthropic", "openai", "groq"]:
            try:
                # Быстрый тест с коротким таймаутом
                await asyncio.wait_for(
                    self._test_provider(provider_name, test_message),
                    timeout=5.0
                )
                results[provider_name] = True
            except Exception:
                results[provider_name] = False

        return results

Сроки

  • Базовый fallback с retry: 1–2 дня
  • Circuit breaker + мониторинг: 2–3 дня
  • Async параллельный fallback: 2–3 дня
  • Полная production-система с алертингом: 1 неделя