Разработка AI для инвентаризации склада (камеры и дроны)
Ручная инвентаризация склада — 2–5 дней простоя, 1–3% ошибок при подсчёте. AI-системы на базе фиксированных камер или дронов сокращают время до нескольких часов с точностью подсчёта 98–99%. Дроны особенно эффективны для высоких стеллажей (10+ метров) недоступных ручным сканерам. Задачи: подсчёт паллет, контроль заполнения ячеек, идентификация артикулов по штрихкодам и визуальным признакам.
Система подсчёта паллет по камерам
import numpy as np
import cv2
from ultralytics import YOLO
import sqlite3
from datetime import datetime
class WarehouseInventorySystem:
def __init__(self, detector_path: str,
db_path: str = 'warehouse.db',
device: str = 'cuda'):
self.detector = YOLO(detector_path)
self.device = device
self.db = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.db.execute('''
CREATE TABLE IF NOT EXISTS inventory_snapshots (
id INTEGER PRIMARY KEY,
camera_id TEXT,
zone_id TEXT,
timestamp TIMESTAMP,
pallet_count INTEGER,
occupancy_pct REAL,
detections TEXT
)
''')
self.db.commit()
def count_pallets(self, image: np.ndarray,
camera_id: str,
zone_id: str,
total_slots: int = None) -> dict:
"""
Подсчёт паллет в зоне хранения.
total_slots: известное общее количество ячеек для % заполнения.
"""
results = self.detector(image, conf=0.45)
detections = []
for box in results[0].boxes:
cls_name = self.detector.model.names[int(box.cls)]
detections.append({
'type': cls_name,
'bbox': box.xyxy[0].tolist(),
'conf': float(box.conf)
})
pallet_count = sum(
1 for d in detections
if d['type'] in ['pallet', 'loaded_pallet', 'empty_pallet']
)
loaded = sum(1 for d in detections if d['type'] == 'loaded_pallet')
empty = sum(1 for d in detections if d['type'] == 'empty_pallet')
occupancy = (pallet_count / total_slots * 100) if total_slots else None
result = {
'camera_id': camera_id,
'zone_id': zone_id,
'pallet_count': pallet_count,
'loaded_pallets': loaded,
'empty_pallets': empty,
'occupancy_pct': occupancy,
'timestamp': datetime.now().isoformat()
}
# Сохранение в БД
self.db.execute(
'INSERT INTO inventory_snapshots VALUES (NULL, ?, ?, ?, ?, ?, ?)',
(camera_id, zone_id, result['timestamp'],
pallet_count, occupancy, str(detections))
)
self.db.commit()
return result
def get_zone_history(self, zone_id: str,
hours: int = 24) -> list[dict]:
"""История изменений заполнения зоны"""
rows = self.db.execute('''
SELECT timestamp, pallet_count, occupancy_pct
FROM inventory_snapshots
WHERE zone_id = ?
AND timestamp > datetime('now', ?)
ORDER BY timestamp DESC
''', (zone_id, f'-{hours} hours')).fetchall()
return [
{'timestamp': r[0], 'pallet_count': r[1], 'occupancy': r[2]}
for r in rows
]
Система на базе дронов для высоких стеллажей
class DroneInventorySystem:
"""
Автономный обход стеллажей с камерой:
1. Планирование маршрута по известной карте склада
2. Съёмка каждой ячейки
3. OCR штрихкодов и QR-кодов
4. Сверка с WMS (Warehouse Management System)
"""
def __init__(self, barcode_reader, wms_client,
rack_detector: YOLO):
self.barcode_reader = barcode_reader
self.wms = wms_client
self.rack_detector = rack_detector
def inventory_rack(self, rack_images: list[dict]) -> dict:
"""
rack_images: [{'image': np.ndarray, 'rack_id': str, 'shelf': int, 'slot': int}]
Анализ всех фото конкретного стеллажа.
"""
rack_inventory = []
discrepancies = []
for item in rack_images:
image = item['image']
location = f"{item['rack_id']}-{item['shelf']}-{item['slot']}"
# Детекция занятости ячейки
results = self.rack_detector(image, conf=0.5)
is_occupied = len(results[0].boxes) > 0
# OCR штрихкода/QR-кода
barcodes = self._read_barcodes(image)
slot_data = {
'location': location,
'occupied': is_occupied,
'barcodes': barcodes,
'sku_ids': [self._barcode_to_sku(b) for b in barcodes]
}
rack_inventory.append(slot_data)
# Сверка с WMS
if barcodes:
for sku_id in slot_data['sku_ids']:
wms_expected = self.wms.get_expected_sku(location)
if sku_id != wms_expected:
discrepancies.append({
'location': location,
'found_sku': sku_id,
'expected_sku': wms_expected,
'discrepancy_type': 'wrong_sku' if wms_expected else 'unexpected_item'
})
return {
'total_slots_checked': len(rack_inventory),
'occupied_slots': sum(1 for s in rack_inventory if s['occupied']),
'empty_slots': sum(1 for s in rack_inventory if not s['occupied']),
'barcodes_read': sum(len(s['barcodes']) for s in rack_inventory),
'discrepancies': discrepancies,
'discrepancy_rate': len(discrepancies) / len(rack_inventory) if rack_inventory else 0,
'inventory': rack_inventory
}
def _read_barcodes(self, image: np.ndarray) -> list[str]:
"""Чтение штрихкодов через pyzbar + QR через cv2"""
from pyzbar import pyzbar
decoded = pyzbar.decode(image)
barcodes = [d.data.decode('utf-8') for d in decoded]
# Попытка QR через OpenCV
qr = cv2.QRCodeDetector()
data, _, _ = qr.detectAndDecode(image)
if data:
barcodes.append(data)
return barcodes
def _barcode_to_sku(self, barcode: str) -> str:
"""Маппинг штрихкода → SKU через WMS"""
return self.wms.lookup_barcode(barcode) or barcode
Генерация отчёта инвентаризации
def generate_inventory_report(inventory_data: list[dict],
wms_data: dict,
output_format: str = 'json') -> dict:
"""Сводный отчёт с расхождениями для бухгалтерии/логистики"""
total_items = sum(s.get('quantity', 1) for s in inventory_data)
occupied = sum(1 for s in inventory_data if s.get('occupied'))
all_discrepancies = []
for slot in inventory_data:
for disc in slot.get('discrepancies', []):
all_discrepancies.append({**disc, 'location': slot['location']})
return {
'report_date': datetime.now().isoformat(),
'summary': {
'total_slots_scanned': len(inventory_data),
'occupied': occupied,
'empty': len(inventory_data) - occupied,
'occupancy_rate_pct': occupied / len(inventory_data) * 100,
'total_items': total_items,
'discrepancy_count': len(all_discrepancies)
},
'discrepancies': all_discrepancies,
'accuracy_rate': 1 - len(all_discrepancies) / max(len(inventory_data), 1)
}
| Метод |
Точность подсчёта |
Скорость |
Покрытие |
| Фиксированные камеры |
95–98% |
Realtime |
Зоны обзора камер |
| Мобильный робот-AGV |
97–99% |
2–4 км/ч |
Весь этаж |
| Дрон-квадрокоптер |
96–99% |
5–8 км/ч |
3D, высокие стеллажи |
| RFID (без CV) |
99%+ |
Realtime |
Только RFID-метки |
| Задача |
Срок |
| Подсчёт паллет на базе существующих камер |
4–6 недель |
| Дроновая инвентаризация стеллажей + WMS интеграция |
12–20 недель |
| Полная autonomous inventory система (AGV + AI) |
24–40 недель |