AI-система управления спросом и распределением автомобилей каршеринга
Каршеринг — задача пространственно-временного баланса: автомобили скапливаются в спальных районах утром и в центре вечером. AI-система предсказывает спрос по зонам на 2-24 часа вперёд и оптимизирует распределение флота, снижая простои на 20-35%.
Прогнозирование спроса по зонам
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import LabelEncoder
class ZonalDemandForecaster:
"""Прогноз спроса на прокат по географическим зонам"""
def __init__(self, n_zones: int):
self.n_zones = n_zones
self.models = {} # Отдельная модель для каждой зоны
def build_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Временные и контекстуальные признаки"""
features = pd.DataFrame()
# Временные
features['hour'] = df['timestamp'].dt.hour
features['weekday'] = df['timestamp'].dt.weekday
features['is_weekend'] = (features['weekday'] >= 5).astype(int)
features['is_morning_rush'] = features['hour'].between(7, 10).astype(int)
features['is_evening_rush'] = features['hour'].between(17, 20).astype(int)
features['month'] = df['timestamp'].dt.month
# Погода (из внешнего API)
features['temperature'] = df.get('temperature_c', 15)
features['precipitation_mm'] = df.get('precipitation_mm', 0)
features['is_raining'] = (features['precipitation_mm'] > 2).astype(int)
# Лаговые признаки
features['demand_lag_1h'] = df.get('demand_1h_ago', 0)
features['demand_lag_24h'] = df.get('demand_24h_ago', 0)
features['demand_lag_week'] = df.get('demand_7d_ago', 0)
# Специальные события
features['is_holiday'] = df.get('is_holiday', 0)
features['event_nearby'] = df.get('event_capacity_nearby', 0)
return features.fillna(0)
def train(self, historical_data: pd.DataFrame):
"""Обучение модели для каждой зоны"""
for zone_id in range(self.n_zones):
zone_data = historical_data[historical_data['zone_id'] == zone_id]
if len(zone_data) < 500:
continue
X = self.build_features(zone_data)
y = zone_data['trips_started']
self.models[zone_id] = GradientBoostingRegressor(
n_estimators=200, learning_rate=0.05, max_depth=4, random_state=42
)
self.models[zone_id].fit(X, y)
def forecast(self, zone_id: int, future_features: pd.DataFrame) -> np.ndarray:
"""Прогноз на горизонт forecast_hours"""
if zone_id not in self.models:
return np.zeros(len(future_features))
X = self.build_features(future_features)
return self.models[zone_id].predict(X).clip(0)
class FleetRebalancer:
"""Оптимизация перераспределения флота"""
def compute_rebalancing_plan(self, current_distribution: dict,
demand_forecast: dict,
fleet_size: int) -> list[dict]:
"""
current_distribution: {zone_id: car_count}
demand_forecast: {zone_id: expected_trips_next_2h}
Returns: список перемещений (откуда → куда, сколько машин)
"""
# Целевое распределение пропорционально прогнозу спроса
total_demand = sum(demand_forecast.values()) + 1e-9
target_distribution = {
zone_id: int(fleet_size * demand / total_demand)
for zone_id, demand in demand_forecast.items()
}
# Корректировка: итог должен = fleet_size
diff = fleet_size - sum(target_distribution.values())
top_zones = sorted(demand_forecast, key=demand_forecast.get, reverse=True)
for i in range(abs(diff)):
zone = top_zones[i % len(top_zones)]
target_distribution[zone] += 1 if diff > 0 else -1
# Вычисляем перемещения
surpluses = {z: current_distribution.get(z, 0) - target_distribution.get(z, 0)
for z in set(current_distribution) | set(target_distribution)}
moves = []
surplus_zones = sorted([(z, s) for z, s in surpluses.items() if s > 0], key=lambda x: -x[1])
deficit_zones = sorted([(z, -s) for z, s in surpluses.items() if s < 0], key=lambda x: -x[1])
s_idx, d_idx = 0, 0
while s_idx < len(surplus_zones) and d_idx < len(deficit_zones):
s_zone, s_count = surplus_zones[s_idx]
d_zone, d_count = deficit_zones[d_idx]
move_count = min(s_count, d_count)
if move_count > 0:
moves.append({
'from_zone': s_zone,
'to_zone': d_zone,
'cars_to_move': move_count,
'priority': 'high' if d_count > 3 else 'normal'
})
surplus_zones[s_idx] = (s_zone, s_count - move_count)
deficit_zones[d_idx] = (d_zone, d_count - move_count)
if surplus_zones[s_idx][1] == 0:
s_idx += 1
if deficit_zones[d_idx][1] == 0:
d_idx += 1
return sorted(moves, key=lambda x: x['priority'] == 'high', reverse=True)
class DynamicPricingForCarsharing:
"""Ценообразование на основе спроса"""
def calculate_surge_multiplier(self, zone_id: int,
available_cars: int,
demand_forecast_1h: float) -> float:
"""Динамический тариф по соотношению спрос/предложение"""
supply_demand_ratio = available_cars / max(demand_forecast_1h, 0.1)
if supply_demand_ratio > 2.0:
multiplier = 0.85 # Скидка при избытке
elif supply_demand_ratio > 1.5:
multiplier = 1.0
elif supply_demand_ratio > 1.0:
multiplier = 1.15
elif supply_demand_ratio > 0.5:
multiplier = 1.3
else:
multiplier = 1.5 # Максимальная наценка при дефиците
return round(multiplier, 2)
def incentivize_user_rebalancing(self, pickup_zone: int,
dropoff_zone: int,
zone_surpluses: dict) -> float:
"""Скидка пользователю, который привезёт машину в дефицитную зону"""
pickup_surplus = zone_surpluses.get(pickup_zone, 0)
dropoff_deficit = -zone_surpluses.get(dropoff_zone, 0)
if dropoff_deficit > 3 and pickup_surplus > 2:
return 0.15 # 15% скидка на поездку
return 0.0
Прогнозирование спроса каршеринга: RMSE 1.5-2.5 поездок на зону-час при горизонте 2 часа (против среднего 3-4 без ML). Оптимизация распределения сокращает average wait time на 18-25% и увеличивает utilization rate флота с 40-45% до 55-65%.







