Разработка AI-системы автоматического учёта потребления ресурсов вода газ тепло электричество
Автоматический учёт ресурсов (AMI — Advanced Metering Infrastructure) — основа для тарификации, балансировки сети и детекции потерь. ML-слой над AMI решает задачи, недоступные простой телеметрии: выявление неисправных счётчиков, детекция хищений, прогноз потребления для планирования.
Архитектура AMI системы
Цепочка сбора данных:
Счётчик (Modbus/DLMS-COSEM/M-Bus)
↓ PLC (PowerLine Communication) / NB-IoT / LoRaWAN
↓ Концентратор данных (DCU)
↓ MDMS (Meter Data Management System)
↓ ML-аналитика + Биллинг
Протоколы по типу ресурса:
metering_protocols = {
'electricity': {
'protocol': 'DLMS/COSEM (IEC 62056)',
'communication': 'PLC G3, NB-IoT, GPRS',
'interval': '30 минут (профиль нагрузки)'
},
'water': {
'protocol': 'Modbus RTU / M-Bus',
'communication': 'LoRaWAN, NB-IoT',
'interval': '60 минут'
},
'heat': {
'protocol': 'M-Bus (EN 13757)',
'communication': 'LoRa, GPRS',
'interval': '60 минут (теплосчётчик Kamstrup, Danfoss)'
},
'gas': {
'protocol': 'Modbus / GSM',
'communication': 'GSM/GPRS, NB-IoT',
'interval': '60 минут'
}
}
Валидация показаний
Детекция подозрительных показаний:
import pandas as pd
import numpy as np
from scipy import stats
def validate_meter_readings(meter_id: str,
current_reading: float,
history: pd.DataFrame) -> dict:
"""
Три типа аномалий:
1. Нулевое потребление — счётчик неисправен или намеренно отключён
2. Аномально высокое — прорыв, хищение, неисправность
3. Отрицательный прирост — счётчик заменён или переполнен
"""
issues = []
# Отрицательный прирост
if len(history) > 0:
prev_reading = history['reading'].iloc[-1]
delta = current_reading - prev_reading
if delta < 0:
issues.append({
'type': 'negative_increment',
'delta': delta,
'severity': 'warning',
'action': 'check_meter_replacement'
})
elif delta == 0 and history['reading'].diff().tail(3).sum() == 0:
issues.append({
'type': 'zero_consumption_extended',
'zero_periods': 3,
'severity': 'major',
'action': 'check_meter_communication'
})
# Статистическая аномалия
if len(history) >= 30:
typical_deltas = history['reading'].diff().dropna()
z_score = stats.zscore([delta])[0]
if abs(z_score) > 4:
issues.append({
'type': 'statistical_outlier',
'z_score': round(z_score, 2),
'severity': 'major' if z_score > 4 else 'critical',
'action': 'field_verification'
})
return {
'meter_id': meter_id,
'current_reading': current_reading,
'issues': issues,
'valid': len(issues) == 0
}
Детекция хищений и несанкционированного потребления
Баланс сети — метод потерь:
def detect_losses_by_balance(supply_reading: float,
consumer_readings: pd.DataFrame,
technical_loss_pct: float = 0.03) -> dict:
"""
Баланс участка: подача - Σ(потребители) = потери
Нормативные потери зависят от типа сети и длины.
Превышение нормы = коммерческие потери (хищение или неисправный счётчик).
"""
total_consumed = consumer_readings['delta_kwh'].sum()
expected_technical = supply_reading * technical_loss_pct
actual_losses = supply_reading - total_consumed
commercial_losses = actual_losses - expected_technical
loss_rate = commercial_losses / (supply_reading + 1e-9)
return {
'supply_kwh': supply_reading,
'total_consumed_kwh': total_consumed,
'technical_losses_kwh': round(expected_technical, 2),
'commercial_losses_kwh': round(commercial_losses, 2),
'loss_rate_pct': round(loss_rate * 100, 2),
'anomaly': loss_rate > 0.08,
'action': 'audit_consumers_on_feeder' if loss_rate > 0.15 else None
}
Поиск подозрительных абонентов:
from sklearn.ensemble import IsolationForest
def detect_theft_suspects(consumer_features: pd.DataFrame) -> pd.DataFrame:
"""
Признаки хищения электроэнергии:
- Потребление ниже типичного для аналогичных объектов
- Потребление только ночью (обходит одноставочный тариф)
- Нет корреляции с погодой (если загородный дом)
- Резкое снижение после поверки счётчика
"""
features = [
'monthly_kwh', 'night_ratio', 'weather_correlation',
'year_over_year_change', 'peer_group_deviation'
]
model = IsolationForest(contamination=0.05, random_state=42)
consumer_features['theft_score'] = -model.fit_predict(consumer_features[features])
suspects = consumer_features[
consumer_features['theft_score'] == -1 # outlier
].sort_values('monthly_kwh')
suspects['priority'] = suspects['peer_group_deviation'].abs().rank(ascending=False)
return suspects[['consumer_id', 'address', 'monthly_kwh',
'peer_group_deviation', 'priority']]
Прогноз потребления для балансировки
Краткосрочный прогноз нагрузки:
from lightgbm import LGBMRegressor
def train_consumption_forecaster(meter_history: pd.DataFrame,
weather_data: pd.DataFrame) -> LGBMRegressor:
"""
Прогноз на 24 часа для конкретного счётчика / группы абонентов.
Используется для балансировки нагрузки и планирования закупок.
"""
features = [
'hour', 'day_of_week', 'month', 'is_weekend', 'is_holiday',
'temperature', 'temperature_forecast',
'consumption_lag_24h', 'consumption_lag_168h',
'consumption_rolling_mean_7d'
]
combined = meter_history.merge(weather_data, on='timestamp', how='left')
combined['consumption_lag_24h'] = combined['consumption'].shift(96) # 15-мин данные
combined['consumption_lag_168h'] = combined['consumption'].shift(672)
combined['consumption_rolling_mean_7d'] = combined['consumption'].rolling(672).mean()
train = combined.dropna(subset=features)
model = LGBMRegressor(n_estimators=200, learning_rate=0.05)
model.fit(train[features], train['consumption'])
return model
Интеграция с биллингом: MDMS платформы: Itron EE, Landis+Gyr Gridstream, OpenWay Riva. Экспорт в SAP IS-U, 1С: ЖКХ, Биллинг-Центр. API для личного кабинета потребителя — история потребления, аномалии, советы по экономии.
Сроки: AMI коннектор + валидация показаний + базовый баланс потерь — 3-4 недели. ML детекция хищений, прогноз нагрузки, SAP IS-U интеграция, личный кабинет — 2-3 месяца.







