Интеграция ROS (Robot Operating System) с AI-моделями
ROS 2 — стандарт для разработки робототехнических систем. Интеграция AI-моделей с ROS позволяет роботу воспринимать среду (computer vision, LiDAR обработка), принимать решения (reinforcement learning policy) и выполнять действия в реальном времени с минимальной latency.
Архитектура ROS 2 + AI
В ROS 2 AI-модель обычно реализуется как Node, который подписывается на sensor topics (камера, LiDAR) и публикует результаты (обнаруженные объекты, команды управления):
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray
from cv_bridge import CvBridge
import torch
import numpy as np
class ObjectDetectionNode(Node):
def __init__(self):
super().__init__('object_detection')
# Загрузка модели
self.model = torch.hub.load(
'ultralytics/yolov8', 'yolov8n',
pretrained=True
)
self.model.eval()
if torch.cuda.is_available():
self.model.cuda()
self.bridge = CvBridge()
# Подписка на камеру
self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10 # QoS depth
)
# Публикация результатов
self.detection_pub = self.create_publisher(
Detection2DArray,
'/detections',
10
)
self.get_logger().info('Object detection node started')
def image_callback(self, msg: Image):
# Конвертация ROS Image → numpy
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='rgb8')
# Инференс
with torch.no_grad():
results = self.model(cv_image)
# Конвертация в ROS Detection2DArray
detections = self._to_detection_array(results, msg.header)
self.detection_pub.publish(detections)
def main():
rclpy.init()
node = ObjectDetectionNode()
rclpy.spin(node)
rclpy.shutdown()
LiDAR + PointNet для 3D восприятия
from sensor_msgs.msg import PointCloud2
import sensor_msgs_py.point_cloud2 as pc2
class LidarPerceptionNode(Node):
def __init__(self):
super().__init__('lidar_perception')
self.pointnet_model = load_pointnet_model('pointnet_weights.pth')
self.sub = self.create_subscription(
PointCloud2, '/velodyne_points',
self.lidar_callback, 10
)
def lidar_callback(self, msg: PointCloud2):
# Конвертация PointCloud2 → numpy (N, 3)
points = np.array(list(pc2.read_points(msg, field_names=("x","y","z"))))
# Субсэмплирование до 1024 точек для PointNet
indices = np.random.choice(len(points), 1024, replace=False)
sampled = points[indices]
# Инференс
tensor = torch.FloatTensor(sampled).unsqueeze(0).cuda()
with torch.no_grad():
classes = self.pointnet_model(tensor)
# Публикация классифицированных объектов
Latency оптимизация для real-time
Для роботических систем latency критична: perception loop должен работать на 10-30 Hz:
- TensorRT оптимизация: конвертация PyTorch → ONNX → TensorRT даёт 2-5x ускорение на NVIDIA Jetson
- Параллельная обработка: запуск инференса в отдельном потоке, не блокируя ROS callback executor
- CUDA streams: параллельная обработка нескольких кадров
# TensorRT для NVIDIA Jetson Orin
import tensorrt as trt
# YOLOv8 → ONNX → TensorRT: ~8ms на Jetson Orin vs ~45ms PyTorch
Навигация с RL policy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
class RLNavigationNode(Node):
def __init__(self):
super().__init__('rl_navigation')
self.policy = load_stable_baselines3_model('navigation_policy.zip')
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.odom_sub = self.create_subscription(Odometry, '/odom', self.step, 10)
def step(self, odom_msg):
state = self._odom_to_state(odom_msg)
action, _ = self.policy.predict(state, deterministic=True)
cmd = Twist()
cmd.linear.x = float(action[0])
cmd.angular.z = float(action[1])
self.cmd_vel_pub.publish(cmd)
Интеграция занимает 2-4 недели в зависимости от сложности perception pipeline и требований к real-time обработке.







