AI-система для автономного морского судна
Автономное управление судном — задача на стыке CV, планирования и морского права. COLREG (международные правила предупреждения столкновений) требуют, чтобы система не просто обнаруживала суда, но и корректно интерпретировала их огни, выполняла манёвры уступки согласно правилам. Это уровень сложности значительно выше автомобильной автономии.
Восприятие: мультисенсорный fusion на море
Морская среда специфична: волнение, брызги на линзе, туман, слепящее солнце на воде, ночные огни.
import numpy as np
import cv2
from ultralytics import YOLO
class MarinePerceptionSystem:
def __init__(self, config: dict):
# Детектор судов и препятствий: YOLOv8l, дообученный на морских данных
self.vessel_detector = YOLO(config['vessel_model'])
# Радар-данные (ARPA/AIS)
self.radar_parser = RadarARPAParser(config['radar_port'])
self.ais_receiver = AISReceiver(config['ais_port'])
# LiDAR (Ouster OS1) для ближней зоны < 100м
self.lidar_processor = MarineLiDAR(config['lidar_config'])
self.camera_matrix = np.array(config['cam_intrinsics'])
def fuse_detections(self, frame: np.ndarray,
radar_tracks: list,
ais_contacts: list,
lidar_points: np.ndarray) -> list[dict]:
# Камера: обнаружение судов, буёв, плавника, людей за бортом
cam_dets = self.vessel_detector(frame, conf=0.4)
fused_contacts = []
for det in cam_dets[0].boxes:
cls = self.vessel_detector.model.names[int(det.cls)]
bbox = list(map(int, det.xyxy[0]))
bearing = self._bearing_from_bbox(bbox, frame.shape)
# Дистанция из LiDAR (если есть точки в секторе)
distance = self._lidar_distance_in_sector(lidar_points, bearing)
# Сопоставление с AIS (если есть MMSI в том направлении)
ais_match = self._match_ais(bearing, distance, ais_contacts)
contact = {
'class': cls,
'bearing_deg': bearing,
'distance_m': distance,
'confidence': float(det.conf),
'bbox': bbox,
'ais_data': ais_match,
'source': 'camera+lidar'
}
# Дополняем радарным треком
radar_match = self._match_radar(bearing, distance, radar_tracks)
if radar_match:
contact['cog'] = radar_match.get('cog') # курс
contact['sog'] = radar_match.get('sog') # скорость
contact['tcpa'] = radar_match.get('tcpa') # время до CPA
contact['cpa'] = radar_match.get('cpa') # минимальное расстояние
fused_contacts.append(contact)
return fused_contacts
def _bearing_from_bbox(self, bbox: list, frame_shape: tuple) -> float:
cx = (bbox[0] + bbox[2]) / 2
fov_h = 60 # градусов горизонтального обзора
return (cx / frame_shape[1] - 0.5) * fov_h # относительно курса
COLREG-совместимое планирование манёвров
class COLREGPlanner:
"""
Правила МППСС-72 (COLREG): определяем тип ситуации
и обязательный манёвр.
"""
def assess_situation(self, own_vessel: dict,
target: dict) -> dict:
bearing_to_target = target['bearing_deg']
tcpa = target.get('tcpa', float('inf'))
cpa = target.get('cpa', float('inf'))
situation = 'safe'
action = 'none'
# COLREG Rule 13: обгон
if -22.5 <= bearing_to_target <= 22.5 and tcpa < 12 * 60:
situation = 'overtaking'
# Мы обгоняем: уступаем дорогу
action = 'alter_course_starboard'
# COLREG Rule 14: курсы на встречу
elif abs(bearing_to_target) < 5:
situation = 'head_on'
action = 'alter_course_starboard'
# COLREG Rule 15: пересечение курсов
elif 0 < bearing_to_target < 112.5:
situation = 'crossing_give_way'
action = 'alter_course_starboard_or_reduce_speed'
elif -112.5 < bearing_to_target < 0:
situation = 'crossing_stand_on'
action = 'maintain_course_and_speed'
return {
'situation': situation,
'action': action,
'tcpa_minutes': tcpa / 60,
'cpa_meters': cpa,
'urgency': 'HIGH' if cpa < 500 and tcpa < 5 * 60 else
'MEDIUM' if cpa < 1000 else 'LOW'
}
Обнаружение человека за бортом (Man Overboard)
MOB-детекция — критичная функция. Человек в воде: небольшой объект (30×40 px на 50м), может быть скрыт волнами.
class MOBDetector:
def __init__(self):
self.detector = YOLO('yolov8m_mob.pt') # дообученный на морских людях
self.thermal_model = ThermalPersonDetector() # для ночи
def detect(self, frame: np.ndarray,
thermal_frame: np.ndarray = None) -> list:
# RGB детекция
rgb_dets = self.detector(frame, conf=0.35, classes=[0]) # person
dets = list(rgb_dets[0].boxes)
# Ночью или при плохой видимости — тепловизор
if thermal_frame is not None:
thermal_dets = self.thermal_model.detect(thermal_frame)
dets.extend(thermal_dets)
# Фильтр: человек в воде имеет bbox близко к горизонту
horizon_y = frame.shape[0] * 0.4 # примерно
mob_candidates = []
for det in dets:
bbox = list(map(int, det.xyxy[0]))
if bbox[1] > horizon_y: # ниже горизонта = в воде
mob_candidates.append({
'bbox': bbox,
'confidence': float(det.conf)
})
return mob_candidates
Характеристики системы
| Параметр | Значение |
|---|---|
| Дальность обнаружения судов (камера) | До 3 км (ясная погода) |
| Дальность LiDAR (Ouster OS1-64) | До 120м |
| Latency полного цикла perception | 150–250ms |
| Поддерживаемые типы объектов | Суда, буи, плавник, MOB |
| Интеграция | NMEA 2000, CAN bus, ARPA радар |
| Тип проекта | Срок |
|---|---|
| Система восприятия (perception only) | 3–5 месяцев |
| Perception + COLREG planning | 6–10 месяцев |
| Полная автономная система с сертификацией | 18–36 месяцев |







