AI-система матчинга водителей и пассажиров в райдшеринге
Матчинг в Uber/Lyft — не просто «ближайший водитель». Алгоритм одновременно минимизирует время ожидания пассажира, максимизирует загрузку водителя и предотвращает «пустые» перегоны между поездками. При правильном матчинге ETA снижается на 30-40%.
Алгоритм оптимального матчинга
import numpy as np
from scipy.optimize import linear_sum_assignment
from dataclasses import dataclass
from typing import Optional
import heapq
@dataclass
class Driver:
id: str
lat: float
lon: float
current_passengers: int
max_passengers: int
rating: float
acceptance_rate: float
vehicle_type: str # economy, comfort, xl
@dataclass
class RideRequest:
id: str
pickup_lat: float
pickup_lon: float
dropoff_lat: float
dropoff_lon: float
passenger_count: int
vehicle_preference: str
max_wait_seconds: int
surge_accepted: bool
class RideshareMatchingEngine:
"""Матчинг водитель-пассажир с учётом множества критериев"""
EARTH_RADIUS_KM = 6371.0
def haversine_distance(self, lat1: float, lon1: float,
lat2: float, lon2: float) -> float:
"""Расстояние в км"""
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = (np.sin(dlat/2)**2 +
np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2)
return 2 * self.EARTH_RADIUS_KM * np.arcsin(np.sqrt(a))
def estimated_pickup_time(self, driver: Driver, request: RideRequest) -> float:
"""ETA в минутах (упрощённо через дистанцию, в production — OSRM/Google Maps)"""
dist_km = self.haversine_distance(
driver.lat, driver.lon,
request.pickup_lat, request.pickup_lon
)
# Средняя скорость с учётом городского трафика: 20-25 км/ч
return dist_km / 22 * 60
def compute_match_score(self, driver: Driver,
request: RideRequest) -> float:
"""
Составной скор для матчинга. Минимизируем ETA + максимизируем
utilization + учитываем предпочтения и качество водителя.
"""
eta_min = self.estimated_pickup_time(driver, request)
# Жёсткие ограничения
if driver.vehicle_type != request.vehicle_preference and request.vehicle_preference != 'any':
if not (request.vehicle_preference == 'economy' and driver.vehicle_type == 'comfort'):
return -1.0 # Недопустимое совпадение
if driver.current_passengers + request.passenger_count > driver.max_passengers:
return -1.0 # Нет мест
if eta_min > request.max_wait_seconds / 60:
return -1.0 # Слишком долго ждать
# Нормализация компонент (меньше ETA = выше скор)
eta_score = max(0, 1.0 - eta_min / 10) # 0 мин = 1.0, 10+ мин = 0
# Качество водителя
quality_score = (driver.rating - 4.0) / 1.0 * 0.5 + driver.acceptance_rate * 0.5
# Детур-коэффициент для пул-поездок (если водитель уже везёт пассажиров)
if driver.current_passengers > 0:
detour_factor = 0.7 # Пул-поездка менее привлекательна для пассажира
else:
detour_factor = 1.0
return eta_score * 0.55 + quality_score * 0.25 + detour_factor * 0.20
def batch_match(self, drivers: list[Driver],
requests: list[RideRequest]) -> dict:
"""
Оптимальный батч-матчинг через венгерский алгоритм.
Запускается каждые 30 секунд для накопившихся запросов.
"""
n_drivers = len(drivers)
n_requests = len(requests)
if n_drivers == 0 or n_requests == 0:
return {'matches': [], 'unmatched_requests': [r.id for r in requests]}
# Матрица стоимости (венгерский алгоритм минимизирует, поэтому инвертируем скор)
cost_matrix = np.full((n_drivers, n_requests), 1000.0)
for i, driver in enumerate(drivers):
for j, request in enumerate(requests):
score = self.compute_match_score(driver, request)
if score >= 0:
cost_matrix[i, j] = 1.0 - score # Инверсия для минимизации
# Венгерский алгоритм O(n³)
driver_indices, request_indices = linear_sum_assignment(cost_matrix)
matches = []
matched_request_ids = set()
for d_idx, r_idx in zip(driver_indices, request_indices):
if cost_matrix[d_idx, r_idx] < 900.0: # Не фиктивное назначение
matches.append({
'driver_id': drivers[d_idx].id,
'request_id': requests[r_idx].id,
'eta_min': round(self.estimated_pickup_time(drivers[d_idx], requests[r_idx]), 1),
'score': round(1.0 - cost_matrix[d_idx, r_idx], 3)
})
matched_request_ids.add(requests[r_idx].id)
unmatched = [r.id for r in requests if r.id not in matched_request_ids]
return {
'matches': matches,
'unmatched_requests': unmatched,
'match_rate': len(matches) / max(len(requests), 1)
}
class DriverPositioningAdvisor:
"""Рекомендации водителю куда переехать для следующего заказа"""
def suggest_repositioning(self, driver: Driver,
demand_heatmap: dict,
nearby_drivers: list[Driver],
radius_km: float = 3.0) -> dict:
"""
demand_heatmap: {(lat, lon): expected_requests_next_30min}
Ищем зону с высоким спросом и малой конкуренцией среди водителей.
"""
best_zone = None
best_score = -1.0
for (zone_lat, zone_lon), expected_demand in demand_heatmap.items():
dist_to_zone = self.haversine_distance(
driver.lat, driver.lon, zone_lat, zone_lon
)
if dist_to_zone > radius_km:
continue
# Сколько водителей уже в этой зоне
competing_drivers = sum(
1 for d in nearby_drivers
if self.haversine_distance(d.lat, d.lon, zone_lat, zone_lon) < 1.0
)
# Спрос на водителя = demand / (drivers + 1)
demand_per_driver = expected_demand / (competing_drivers + 1)
# Штраф за дистанцию перемещения
relocation_cost = dist_to_zone / radius_km * 0.3
score = demand_per_driver - relocation_cost
if score > best_score:
best_score = score
best_zone = (zone_lat, zone_lon, dist_to_zone, expected_demand)
if best_zone:
return {
'suggest': True,
'target_lat': best_zone[0],
'target_lon': best_zone[1],
'distance_km': round(best_zone[2], 1),
'expected_wait_min': round(best_zone[2] / 22 * 60, 0), # Время добраться
'expected_demand': best_zone[3]
}
return {'suggest': False, 'reason': 'Already in optimal zone'}
def haversine_distance(self, lat1, lon1, lat2, lon2) -> float:
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
return 2 * 6371.0 * np.arcsin(np.sqrt(a))
Батч-матчинг каждые 30 секунд (против жадного онлайн-матчинга) снижает average ETA на 15-20%. Рекомендации позиционирования для водителей повышают их earnings per hour на 10-15% и улучшают покрытие районов с высоким спросом.







