Разработка AI-системы детекции утечек в трубопроводах
Утечка нефти или газа в трубопроводе — экологическая катастрофа и финансовые потери. Задержка обнаружения на 1 час при расходе 500 м³/ч = 500 кубометров продукта в грунт. AI-система сокращает время обнаружения с часов до минут, используя акустику, давление и расходометрию.
Методы детекции утечек
Сравнение методов:
| Метод | Время обнаружения | Минимальная утечка | Локализация |
|---|---|---|---|
| Балансовый (расход) | 5-30 мин | 1-3% от потока | ±500 м |
| Перепад давления | 1-5 мин | 0.5% | ±100 м |
| Акустический | секунды | 0.1% | ±10 м |
| Негативная ударная волна | 30-60 с | 0.5% | ±50 м |
| Fiber Optic DAS | секунды | 0.01% | ±1 м |
Балансовый метод с ML
Детекция по дисбалансу входа-выхода:
import numpy as np
import pandas as pd
from scipy.stats import chi2
class PipelineBalanceMonitor:
def __init__(self, line_id: str, balance_threshold_pct: float = 1.0):
self.line_id = line_id
self.threshold = balance_threshold_pct / 100
self.ewma_balance = None
self.alpha = 0.1
def update(self, inlet_flow_m3h: float, outlet_flow_m3h: float,
line_pack_change: float = 0.0) -> dict:
"""
Баланс = Вход - Выход - Изменение линейного запаса (compressibility)
Для газа: line pack меняется при изменении давления.
"""
balance = inlet_flow_m3h - outlet_flow_m3h - line_pack_change
balance_pct = balance / (inlet_flow_m3h + 1e-9)
# EWMA фильтр
if self.ewma_balance is None:
self.ewma_balance = balance_pct
else:
self.ewma_balance = self.alpha * balance_pct + (1 - self.alpha) * self.ewma_balance
leak_detected = self.ewma_balance > self.threshold
return {
'line_id': self.line_id,
'instantaneous_balance_pct': round(balance_pct * 100, 3),
'ewma_balance_pct': round(self.ewma_balance * 100, 3),
'leak_detected': leak_detected,
'estimated_leak_rate_m3h': max(0, balance) if leak_detected else 0
}
Детекция отрицательной ударной волны (Negative Pressure Wave)
Физика и алгоритм:
def detect_negative_pressure_wave(pressure_sensors: dict,
pipeline_params: dict) -> dict:
"""
При разрыве трубы → резкое снижение давления в точке разрыва →
волна пониженного давления распространяется в обе стороны.
Скорость волны: a = sqrt(K/ρ) ≈ 1000-1300 м/с для жидкостей.
Зная время прихода волны к двум датчикам → местоположение разрыва.
"""
wave_speed = pipeline_params.get('pressure_wave_speed_ms', 1100) # м/с
# Определяем время прихода волны к каждому датчику
arrival_times = {}
for sensor_id, pressure_series in pressure_sensors.items():
# Ищем резкое падение: производная < -threshold
dP_dt = np.gradient(pressure_series.values, 1) # 1-секундные данные
sharp_drop_indices = np.where(dP_dt < -5)[0] # > 5 bar/s = ударная волна
if len(sharp_drop_indices) > 0:
arrival_times[sensor_id] = sharp_drop_indices[0]
if len(arrival_times) < 2:
return {'leak_detected': False}
# Локализация по двум датчикам A и B
sensor_ids = list(arrival_times.keys())[:2]
t_A = arrival_times[sensor_ids[0]]
t_B = arrival_times[sensor_ids[1]]
delta_t = abs(t_A - t_B) # секунды
# Расстояние от A до точки разрыва
distance_AB = pipeline_params['sensor_distances'].get(
(sensor_ids[0], sensor_ids[1]), 5000 # м между датчиками
)
dist_from_A = (distance_AB - wave_speed * delta_t) / 2
dist_from_A = max(0, min(dist_from_A, distance_AB))
return {
'leak_detected': True,
'leak_location_m_from_sensor_A': round(dist_from_A, 0),
'confidence': 'high',
'delta_t_seconds': delta_t
}
Акустический мониторинг
Обработка сигнала акустических датчиков:
from scipy import signal
import librosa
def analyze_acoustic_signal(audio_signal: np.ndarray,
sampling_rate: int = 44100) -> dict:
"""
Утечка производит характерный акустический сигнал:
- Широкополосный шум 100-10000 Гц
- Характеристика зависит от давления и размера отверстия
"""
# Фильтрация: полосовой фильтр 500-5000 Гц (диапазон утечек)
b, a = signal.butter(4, [500, 5000], btype='band', fs=sampling_rate)
filtered = signal.filtfilt(b, a, audio_signal)
# Признаки
rms = np.sqrt(np.mean(filtered**2))
kurtosis = np.mean((filtered - filtered.mean())**4) / (np.std(filtered)**4 + 1e-9)
# Спектральные признаки
freqs, psd = signal.welch(filtered, fs=sampling_rate, nperseg=1024)
band_energy = np.trapz(psd[(freqs >= 1000) & (freqs <= 3000)], freqs[(freqs >= 1000) & (freqs <= 3000)])
total_energy = np.trapz(psd, freqs)
leak_band_ratio = band_energy / (total_energy + 1e-9)
# Классификация: сравнение с шаблонами нормального шума трубопровода
leak_score = rms * 0.4 + leak_band_ratio * 0.4 + min(1, kurtosis / 10) * 0.2
return {
'rms_amplitude': float(rms),
'kurtosis': float(kurtosis),
'leak_band_ratio': float(leak_band_ratio),
'leak_score': float(leak_score),
'leak_detected': leak_score > 0.6
}
Fusion-детекция
Комбинирование методов:
def fuse_leak_detections(balance_result: dict,
npw_result: dict,
acoustic_result: dict) -> dict:
"""
Каждый метод имеет свои ложные срабатывания.
Голосование с весами по надёжности метода.
"""
votes = 0
total_weight = 0
location_estimates = []
if balance_result.get('leak_detected'):
votes += 0.4
location_estimates.append(balance_result.get('estimated_location'))
total_weight += 0.4
if npw_result.get('leak_detected'):
votes += 0.4
location_estimates.append(npw_result.get('leak_location_m_from_sensor_A'))
total_weight += 0.4
if acoustic_result.get('leak_detected'):
votes += 0.2
location_estimates.append(acoustic_result.get('acoustic_location_m'))
total_weight += 0.2
confidence = votes / total_weight
best_location = min(location_estimates, key=lambda x: 0 if x is None else 1) if location_estimates else None
return {
'leak_confirmed': confidence >= 0.4, # хотя бы один надёжный метод
'confidence': round(confidence, 2),
'estimated_location_m': best_location,
'alert_level': 'critical' if confidence > 0.8 else 'warning'
}
Интеграция с SCADA и диспетчером: OPC-UA коннектор к Emerson DeltaV, Honeywell Experion, ABB 800xA. При обнаружении утечки — автоматическое закрытие секционных задвижек через SCADA (Emergency Shutdown). SMS/push уведомление диспетчеру.
Сроки: Балансовый метод + алерты + SCADA коннектор — 3-4 недели. NPW локализация, акустика, fusion-детекция, автоматическое закрытие задвижек — 3-4 месяца.







