Разработка AI Workflow с ветвлением и условной логикой
AI Workflow с ветвлением — система, в которой поток выполнения определяется динамически на основе промежуточных результатов, входных данных или решений LLM. Это сложнее линейного пайплайна, но необходимо для реальных бизнес-процессов, где разные входные данные требуют разных путей обработки.
Типы условного ветвления
Детерминированное ветвление: условие определяется кодом на основе данных (if/else, switch).
LLM-based ветвление: условие определяется решением языковой модели (классификация, routing).
Hybrid: код обрабатывает структурированные условия, LLM — неструктурированные.
Реализация с LangGraph
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal, Optional
import json
class WorkflowState(TypedDict):
input_document: str
document_type: Optional[str]
extracted_data: Optional[dict]
validation_errors: list[str]
processing_path: str
output: Optional[dict]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Нода 1: Классификация документа
def classify_document(state: WorkflowState) -> WorkflowState:
response = llm.invoke(f"""Определи тип документа.
Типы: invoice, contract, complaint, inquiry, other
Документ: {state['input_document'][:500]}
Ответь одним словом:""")
return {**state, "document_type": response.content.strip().lower()}
# Нода 2a: Обработка счёта
def process_invoice(state: WorkflowState) -> WorkflowState:
response = llm.invoke(f"""Извлеки данные счёта.
{state['input_document']}
Верни JSON: {{vendor, amount, date, due_date, items}}""")
return {**state, "extracted_data": json.loads(response.content), "processing_path": "invoice"}
# Нода 2b: Обработка договора
def process_contract(state: WorkflowState) -> WorkflowState:
response = llm.invoke(f"""Извлеки ключевые условия договора.
{state['input_document']}
Верни JSON: {{parties, subject, amount, duration, key_conditions}}""")
return {**state, "extracted_data": json.loads(response.content), "processing_path": "contract"}
# Нода 2c: Обработка жалобы
def process_complaint(state: WorkflowState) -> WorkflowState:
response = llm.invoke(f"""Классифицируй жалобу.
{state['input_document']}
Верни JSON: {{category, severity: low/medium/high/critical, requires_immediate_action: bool}}""")
return {**state, "extracted_data": json.loads(response.content), "processing_path": "complaint"}
# Нода 3: Валидация
def validate_data(state: WorkflowState) -> WorkflowState:
errors = []
data = state.get("extracted_data", {})
if state["document_type"] == "invoice":
if not data.get("amount"):
errors.append("Missing invoice amount")
if not data.get("vendor"):
errors.append("Missing vendor information")
return {**state, "validation_errors": errors}
# Нода 4a: Успешное завершение
def finalize_success(state: WorkflowState) -> WorkflowState:
return {**state, "output": {
"status": "processed",
"path": state["processing_path"],
"data": state["extracted_data"],
}}
# Нода 4b: Обработка ошибок
def handle_validation_errors(state: WorkflowState) -> WorkflowState:
return {**state, "output": {
"status": "validation_failed",
"errors": state["validation_errors"],
"requires_manual_review": True,
}}
# Routing функции
def route_by_document_type(state: WorkflowState) -> str:
mapping = {
"invoice": "process_invoice",
"contract": "process_contract",
"complaint": "process_complaint",
}
return mapping.get(state["document_type"], "process_unknown")
def route_after_validation(state: WorkflowState) -> str:
return "handle_errors" if state["validation_errors"] else "finalize"
# Построение графа
graph = StateGraph(WorkflowState)
graph.add_node("classify", classify_document)
graph.add_node("process_invoice", process_invoice)
graph.add_node("process_contract", process_contract)
graph.add_node("process_complaint", process_complaint)
graph.add_node("validate", validate_data)
graph.add_node("finalize", finalize_success)
graph.add_node("handle_errors", handle_validation_errors)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_by_document_type, {
"process_invoice": "process_invoice",
"process_contract": "process_contract",
"process_complaint": "process_complaint",
"process_unknown": "handle_errors",
})
graph.add_edge("process_invoice", "validate")
graph.add_edge("process_contract", "validate")
graph.add_edge("process_complaint", "validate")
graph.add_conditional_edges("validate", route_after_validation, {
"finalize": "finalize",
"handle_errors": "handle_errors",
})
graph.add_edge("finalize", END)
graph.add_edge("handle_errors", END)
workflow = graph.compile()
Цикличные ветвления: retry и correction loops
MAX_RETRIES = 3
def check_quality_and_retry(state: WorkflowState) -> str:
"""Решает: принять результат или отправить на переработку"""
if state.get("retry_count", 0) >= MAX_RETRIES:
return "accept" # Принимаем даже неидеальный результат
quality = assess_output_quality(state["output"])
if quality < 0.8:
return "retry"
return "accept"
def increment_retry(state: WorkflowState) -> WorkflowState:
return {**state, "retry_count": state.get("retry_count", 0) + 1}
# Добавляем цикл в граф
graph.add_conditional_edges("quality_check", check_quality_and_retry, {
"retry": "processing_node",
"accept": "finalize",
})
Практический кейс: workflow обработки входящей корреспонденции
Задача: автоматическая обработка 500+ входящих документов в день (email attachments, загрузки через портал).
Граф ветвления:
Входящий документ
→ Классификация (8 типов)
↓
Тип: счёт → Извлечь реквизиты → Сверить с договором → [OK: 1С] / [ошибка: бухгалтер]
Тип: договор → Извлечь условия → Юридический риск-скор → [low: архив] / [medium+: юрист]
Тип: жалоба → Severity оценка → [critical: немедленно] / [normal: очередь]
Тип: запрос → Тематика → Маршрут в нужный отдел
Метрики:
- Авто-обработка без ручного вмешательства: 71%
- Время routing: мгновенно (vs 45мин вручную)
- Точность классификации: 94%
- Ошибки маршрутизации: 2.1%
Визуализация графа
LangGraph предоставляет встроенную визуализацию:
from IPython.display import Image
Image(workflow.get_graph().draw_mermaid_png())
Сроки
- Проектирование workflow графа: 1 неделя
- Реализация нод и ветвлений: 2–3 недели
- Тестирование edge cases: 1–2 недели
- Итого: 4–6 недель







