AI Агент логистики — цифровой сотрудник
AI Агент логистики автоматизирует операционные задачи цепочки поставок: планирование маршрутов, трекинг отправлений, взаимодействие с перевозчиками, обработка исключений (задержки, повреждения, недостачи), мониторинг KPI и генерация отчётов. Оператор-человек подключается только к нестандартным ситуациям, требующим переговоров.
Мониторинг и трекинг отправлений
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Optional
import operator
llm = ChatOpenAI(model="gpt-4o", temperature=0)
class ShipmentState(TypedDict):
shipment_id: str
shipment_data: dict
tracking_history: list[dict]
anomalies: Annotated[list, operator.add]
actions_taken: Annotated[list, operator.add]
escalation_required: bool
escalation_reason: Optional[str]
class ShipmentMonitor:
async def check_shipment(self, shipment_id: str) -> ShipmentState:
"""Полная проверка статуса отправления"""
# Получаем данные
shipment = await logistics_db.get_shipment(shipment_id)
tracking = await carrier_api.get_tracking(shipment["tracking_number"])
expected_eta = shipment["expected_delivery"]
current_eta = tracking.get("estimated_delivery")
anomalies = []
# Проверка задержки
if current_eta and current_eta > expected_eta:
delay_hours = (current_eta - expected_eta).total_seconds() / 3600
anomalies.append({
"type": "delivery_delay",
"severity": "high" if delay_hours > 24 else "medium",
"details": f"Задержка {delay_hours:.0f} часов, новая дата: {current_eta}",
})
# Отсутствие обновлений
last_update = tracking.get("last_event_time")
hours_since_update = (datetime.now() - last_update).total_seconds() / 3600 if last_update else 99
if hours_since_update > 48:
anomalies.append({
"type": "no_tracking_update",
"severity": "medium",
"details": f"Нет обновлений {hours_since_update:.0f} часов",
})
# LLM-анализ при аномалиях
escalation_required = False
escalation_reason = None
if anomalies:
assessment = await self.assess_anomalies(shipment, anomalies)
escalation_required = assessment["requires_escalation"]
escalation_reason = assessment.get("reason")
return ShipmentState(
shipment_id=shipment_id,
shipment_data=shipment,
tracking_history=tracking.get("events", []),
anomalies=anomalies,
actions_taken=[],
escalation_required=escalation_required,
escalation_reason=escalation_reason,
)
async def assess_anomalies(self, shipment: dict, anomalies: list) -> dict:
"""LLM оценивает необходимость эскалации"""
response = await llm.ainvoke(f"""Оцени ситуацию с отправлением.
Отправление: {json.dumps(shipment, ensure_ascii=False)}
Аномалии: {json.dumps(anomalies, ensure_ascii=False)}
Определи:
1. Требует ли ситуация немедленной эскалации менеджеру?
2. Какие автоматические действия можно предпринять?
3. Нужно ли уведомлять получателя?
Верни JSON: {{"requires_escalation": bool, "reason": "...", "auto_actions": [...], "notify_recipient": bool}}""")
return json.loads(response.content)
Маршрутизация и оптимизация
class RouteOptimizer:
async def optimize_delivery_routes(
self,
deliveries: list[dict], # [{id, address, time_window, weight}]
vehicles: list[dict], # [{id, capacity, location}]
date: str,
) -> dict:
"""Оптимизирует маршруты доставки (Vehicle Routing Problem)"""
# Для малых задач — через LLM с рассуждением
if len(deliveries) <= 20:
return await self.llm_route_optimizer(deliveries, vehicles)
# Для больших — алгоритмический подход + LLM для исключений
return await self.algorithmic_route_optimizer(deliveries, vehicles)
async def llm_route_optimizer(self, deliveries: list, vehicles: list) -> dict:
response = await llm.ainvoke(f"""Составь оптимальные маршруты доставки.
Доставки:
{json.dumps(deliveries, ensure_ascii=False, indent=2)}
Транспортные средства:
{json.dumps(vehicles, ensure_ascii=False, indent=2)}
Учти: временные окна, грузоподъёмность, минимизацию общего пробега.
Верни JSON: {{"routes": [{{"vehicle_id": "...", "stops": [delivery_ids_in_order]}}]}}""")
return json.loads(response.content)
Обработка исключений
class ExceptionHandler:
EXCEPTION_PLAYBOOKS = {
"delivery_delay": {
"auto_actions": ["notify_recipient", "update_crm", "rebook_if_urgent"],
"escalate_if": lambda hours: hours > 72,
},
"damaged_goods": {
"auto_actions": ["create_claim", "notify_sender", "photo_request"],
"escalate_always": True,
},
"customs_hold": {
"auto_actions": ["get_customs_details", "notify_broker"],
"escalate_if": lambda days: days > 3,
},
"address_not_found": {
"auto_actions": ["contact_recipient", "check_database"],
"escalate_if": lambda attempts: attempts > 2,
},
}
async def handle_exception(self, exception: dict) -> dict:
exception_type = exception["type"]
playbook = self.EXCEPTION_PLAYBOOKS.get(exception_type)
if not playbook:
return await self.generic_exception_handler(exception)
actions_taken = []
# Выполняем автоматические действия
for action in playbook.get("auto_actions", []):
result = await self.execute_action(action, exception)
actions_taken.append({"action": action, "result": result})
# Проверяем нужна ли эскалация
escalate = playbook.get("escalate_always", False)
if not escalate and "escalate_if" in playbook:
escalate_fn = playbook["escalate_if"]
# Вызываем с нужным параметром из данных исключения
escalate = escalate_fn(exception.get("delay_hours") or exception.get("hold_days") or exception.get("attempts", 0))
if escalate:
await self.escalate_to_manager(exception, actions_taken)
return {"actions_taken": actions_taken, "escalated": escalate}
Analytics и KPI мониторинг
class LogisticsAnalytics:
async def daily_kpi_report(self) -> str:
"""Ежедневный отчёт по KPI логистики"""
# Данные из БД
metrics = await asyncio.gather(
self.get_on_time_delivery_rate(),
self.get_damage_rate(),
self.get_carrier_performance(),
self.get_cost_per_shipment(),
self.get_exception_rate(),
)
report = await llm.ainvoke(f"""Создай KPI-отчёт по логистике за {datetime.now().strftime('%d.%m.%Y')}.
Метрики:
- On-time delivery: {metrics[0]['rate']:.1%} (цель: {metrics[0]['target']:.1%})
- Damage rate: {metrics[1]['rate']:.3%}
- Top carriers by performance: {metrics[2]}
- Cost per shipment: {metrics[3]['avg']:,.0f} руб
- Exception rate: {metrics[4]['rate']:.2%}
Формат: краткий summary (3 предложения), отклонения от нормы, рекомендации.""")
return report.content
Практический кейс: дистрибьютор FMCG, 500 отправлений/день
Ситуация: 4 логиста, 500 отправлений в день, операторы тратили 70% времени на трекинг и обработку исключений.
AI Агент взял на себя:
- Автоматический трекинг каждые 2 часа через API перевозчиков
- Уведомления получателям о статусе и задержках
- Автоматическое открытие претензий при повреждениях
- Перебронирование при задержках > 4 часов (для критических отправлений)
- Ежедневный KPI-отчёт и weekly carrier scorecard
Результаты:
- Ручная обработка исключений: 180 случаев/день → 45 (агент закрыл автоматически)
- On-time delivery rate: 82% → 89% (лучшая маршрутизация + проактивное управление исключениями)
- Претензии подтверждённые (правильно оформленные): 62% → 91%
- Время логистов на оперативную работу: -55%
Сроки
- Трекинг и мониторинг отправлений: 2–3 недели
- Обработчик исключений с playbooks: 2–3 недели
- Маршрутизация и оптимизация: 2–3 недели
- Analytics и KPI-отчёты: 1 неделя
- Интеграция с перевозчиками: 1–2 недели
- Итого: 8–12 недель







