AI-система мониторинга усталости и поведения водителя
По данным ВОЗ, 20% тяжёлых ДТП на трассах связаны с засыпанием за рулём. Система DMS (Driver Monitoring System) — камера в салоне, направленная на водителя — отслеживает признаки усталости, отвлечения, использования телефона в реальном времени.
Что отслеживаем
Усталость проявляется через несколько измеримых параметров лица:
- PERCLOS (Percentage of Eye Closure): доля времени, когда глаза закрыты > 80% за последние 60 секунд. PERCLOS > 15% = предупреждение, > 25% = тревога
- Частота моргания: норма 12–20 раз/мин, усталость — < 8 или > 30
- Продолжительность моргания: норма 150–200ms, усталость — > 350ms
- Угол наклона головы: кивание вниз > 15° = засыпание
- Направление взгляда: отвлечение на > 3 секунды
import cv2
import numpy as np
import mediapipe as mp
from collections import deque
import time
class DriverMonitoringSystem:
def __init__(self, config: dict):
# MediaPipe Face Mesh: 478 landmarks, быстро, хорошо на embedded
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# Индексы ключевых точек (MediaPipe Face Mesh)
self.LEFT_EYE = [362, 385, 387, 263, 373, 380]
self.RIGHT_EYE = [33, 160, 158, 133, 153, 144]
self.LEFT_IRIS = [474, 475, 476, 477]
self.RIGHT_IRIS = [469, 470, 471, 472]
# Буферы для temporal анализа
window = config.get('window_sec', 60) * config.get('fps', 30)
self.ear_buffer = deque(maxlen=window) # Eye Aspect Ratio
self.blink_buffer = deque(maxlen=window) # 1 если моргание
self.head_pose_buffer = deque(maxlen=300) # 10 секунд
# Текущее состояние моргания
self.in_blink = False
self.blink_start = None
self.alert_callbacks = config.get('alert_callbacks', [])
def _eye_aspect_ratio(self, landmarks: np.ndarray,
eye_indices: list) -> float:
"""EAR = (||p2-p6|| + ||p3-p5||) / (2 * ||p1-p4||)"""
pts = landmarks[eye_indices]
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C + 1e-6)
def _estimate_head_pose(self, landmarks: np.ndarray,
frame_size: tuple) -> dict:
"""Solvepnp для оценки pitch/yaw/roll головы"""
model_points = np.float32([
[0.0, 0.0, 0.0], # нос (тип)
[0.0, -330.0, -65.0], # подбородок
[-225.0, 170.0, -135.0], # левый угол глаза
[225.0, 170.0, -135.0], # правый угол глаза
[-150.0, -150.0, -125.0], # левый угол рта
[150.0, -150.0, -125.0], # правый угол рта
])
key_indices = [1, 152, 263, 33, 287, 57]
image_points = np.float32([landmarks[i] for i in key_indices])
h, w = frame_size
cam_matrix = np.float32([[w, 0, w/2],
[0, w, h/2],
[0, 0, 1]])
dist_coeffs = np.zeros((4, 1))
success, rvec, tvec = cv2.solvePnP(
model_points, image_points, cam_matrix, dist_coeffs
)
if not success:
return {'pitch': 0, 'yaw': 0, 'roll': 0}
rmat, _ = cv2.Rodrigues(rvec)
angles = cv2.RQDecomp3x3(rmat)[0]
return {'pitch': angles[0], 'yaw': angles[1], 'roll': angles[2]}
def process_frame(self, frame: np.ndarray) -> dict:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb)
if not results.multi_face_landmarks:
return {'driver_detected': False, 'alerts': []}
h, w = frame.shape[:2]
lm = results.multi_face_landmarks[0].landmark
landmarks = np.array([[l.x * w, l.y * h] for l in lm])
# EAR для обоих глаз
ear_left = self._eye_aspect_ratio(landmarks, self.LEFT_EYE)
ear_right = self._eye_aspect_ratio(landmarks, self.RIGHT_EYE)
ear = (ear_left + ear_right) / 2.0
self.ear_buffer.append(ear)
# Детекция моргания
ear_threshold = 0.22
if ear < ear_threshold:
if not self.in_blink:
self.in_blink = True
self.blink_start = time.time()
else:
if self.in_blink:
blink_duration = time.time() - self.blink_start
self.blink_buffer.append(blink_duration)
self.in_blink = False
# PERCLOS: доля кадров с EAR < threshold за последние 60 сек
perclos = sum(1 for e in self.ear_buffer
if e < ear_threshold) / max(len(self.ear_buffer), 1)
# Поза головы
head_pose = self._estimate_head_pose(landmarks, (h, w))
self.head_pose_buffer.append(head_pose)
alerts = self._generate_alerts(perclos, head_pose)
return {
'driver_detected': True,
'ear': ear,
'perclos': perclos,
'head_pose': head_pose,
'recent_blink_durations': list(self.blink_buffer)[-5:],
'alerts': alerts
}
def _generate_alerts(self, perclos: float,
head_pose: dict) -> list[str]:
alerts = []
if perclos > 0.25:
alerts.append('DROWSINESS_CRITICAL')
elif perclos > 0.15:
alerts.append('DROWSINESS_WARNING')
if head_pose['pitch'] < -20:
alerts.append('HEAD_NODDING')
if abs(head_pose['yaw']) > 30:
alerts.append('DISTRACTION_YAW')
return alerts
Обнаружение использования телефона
Отдельная модель для рук: YOLOv8n дообученный на Driver Phone Use Dataset:
class PhoneUseDetector:
def __init__(self, model_path: str):
self.model = YOLO(model_path)
self.detection_buffer = deque(maxlen=15) # 0.5 сек @ 30fps
def detect(self, frame: np.ndarray) -> bool:
dets = self.model(frame, conf=0.6,
classes=['phone', 'cell phone'])
self.detection_buffer.append(len(dets[0].boxes) > 0)
# Тревога если телефон обнаружен в 10+ из 15 последних кадров
return sum(self.detection_buffer) >= 10
Производительность на embedded
На Qualcomm SA8295P (ADAS SoC): MediaPipe FaceMesh — 8ms, YOLOv8n телефон — 12ms. Суммарно < 25ms, что обеспечивает реальное время при 30fps без пропусков.
На Raspberry Pi 4 (4GB RAM): MediaPipe + OpenCV — 35ms при 720p. Допустимо для fleet-мониторинга коммерческого транспорта.
Кейс: автобусный парк, 80 машин
Установили DSM (Driver Safety Monitor) в 80 автобусах городского маршрута. За 3 месяца:
- Зафиксировано 1240 событий DROWSINESS_WARNING, 87 — CRITICAL
- После внедрения системы и инструктажа водителей: снижение критических событий на 64%
- Зафиксировано 340 случаев использования телефона за рулём — передано в HR
| Масштаб | Срок |
|---|---|
| Прототип (EAR + PERCLOS) | 3–4 недели |
| Полная DMS (усталость + телефон + взгляд) | 6–10 недель |
| Fleet-решение с облачной аналитикой | 10–16 недель |







