Разработка системы резервирования и отказоустойчивости AI-агентов

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Разработка системы резервирования и отказоустойчивости AI-агентов
Сложный
от 1 недели до 3 месяцев
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1285
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1121
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    858

Разработка системы резервирования и отказоустойчивости AI-агентов

Отказоустойчивость AI-агентов — обеспечение непрерывности сервиса при сбоях: недоступности LLM API, ошибках инструментов, исключениях в коде агента. Критично для production систем с SLA.

Слои отказоустойчивости

Слой LLM: fallback на альтернативные модели при недоступности основной. OpenAI → Anthropic → локальная модель.

Слой инструментов: retry с exponential backoff, circuit breaker, альтернативные инструменты.

Слой агента: checkpoint'ы прогресса, восстановление после падения, human escalation при критических ошибках.

Слой оркестрации: многоагентная резервация, health checks, автоматическое переназначение задач.

Fallback LLM Provider

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
import openai

class ResilientLLMClient:
    def __init__(self):
        self.providers = [
            {"name": "openai", "client": openai.AsyncOpenAI(), "model": "gpt-4o"},
            {"name": "anthropic", "client": anthropic.AsyncAnthropic(), "model": "claude-3-5-sonnet-20241022"},
            {"name": "local_vllm", "client": openai.AsyncOpenAI(base_url="http://gpu1:8000/v1"), "model": "llama-3-8b"},
        ]
        self.circuit_breakers = {p["name"]: CircuitBreaker() for p in self.providers}

    async def generate(self, messages: list, **kwargs) -> str:
        last_error = None

        for provider in self.providers:
            cb = self.circuit_breakers[provider["name"]]
            if cb.is_open():
                continue  # провайдер отключён по circuit breaker

            try:
                result = await self._call_provider(provider, messages, **kwargs)
                cb.record_success()
                return result
            except Exception as e:
                cb.record_failure()
                last_error = e
                logger.warning(f"Provider {provider['name']} failed: {e}")

        raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((openai.RateLimitError, openai.APIConnectionError))
    )
    async def _call_provider(self, provider: dict, messages: list, **kwargs) -> str:
        response = await provider["client"].chat.completions.create(
            model=provider["model"],
            messages=messages,
            **kwargs
        )
        return response.choices[0].message.content

Circuit Breaker

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"       # нормальная работа
    OPEN = "open"           # провайдер отключён
    HALF_OPEN = "half_open" # тестируем восстановление

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED

    def is_open(self) -> bool:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = CircuitState.HALF_OPEN
                return False
            return True
        return False

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error(f"Circuit breaker OPEN after {self.failure_count} failures")

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            logger.info("Circuit breaker CLOSED - provider recovered")

Checkpoint и восстановление задачи

Для длительных агентских задач (> 1 мин) сохраняем прогресс:

@dataclass
class AgentCheckpoint:
    task_id: str
    step_index: int
    completed_steps: list[StepResult]
    state: dict             # произвольное состояние агента
    saved_at: datetime

class CheckpointManager:
    def __init__(self, storage: Redis):
        self.storage = storage

    async def save(self, checkpoint: AgentCheckpoint):
        key = f"checkpoint:{checkpoint.task_id}"
        await self.storage.setex(
            key,
            3600,  # 1 час TTL
            pickle.dumps(checkpoint)
        )

    async def load(self, task_id: str) -> AgentCheckpoint | None:
        key = f"checkpoint:{task_id}"
        data = await self.storage.get(key)
        return pickle.loads(data) if data else None

    async def resume_or_start(self, task: AgentTask) -> AgentCheckpoint:
        existing = await self.load(task.id)
        if existing:
            logger.info(f"Resuming task {task.id} from step {existing.step_index}")
            return existing
        return AgentCheckpoint(task_id=task.id, step_index=0, completed_steps=[], state={}, saved_at=datetime.utcnow())

Human escalation при неразрешимых ошибках

class EscalationPolicy:
    MAX_RETRIES = 3
    MAX_FALLBACK_ATTEMPTS = 2

    async def handle_failure(
        self,
        task: AgentTask,
        error: Exception,
        retry_count: int
    ) -> EscalationDecision:

        if retry_count < self.MAX_RETRIES and self._is_retriable(error):
            return EscalationDecision(action="retry", delay_seconds=2 ** retry_count)

        if isinstance(error, ToolUnavailableError):
            return EscalationDecision(action="use_fallback_tool", tool=self._get_fallback_tool(error.tool))

        # Эскалация к человеку
        await self.notify_human(task, error, retry_count)
        return EscalationDecision(
            action="escalate",
            message=f"Task {task.id} requires human intervention after {retry_count} retries: {error}"
        )

Сроки внедрения

Неделя 1–2: Fallback LLM providers, retry логика, базовые circuit breakers

Неделя 3–4: Checkpoint система, human escalation workflow

Месяц 2: Multi-agent redundancy, chaos testing, SLA dashboard