AI-система мониторинга выбросов и экологической безопасности на химическом производстве
Химическое производство — один из крупнейших источников промышленных выбросов. Непрерывный мониторинг (CEMS — Continuous Emission Monitoring System) с AI-слоем решает три задачи: соответствие нормативам (НДВ/ПДВ), ранняя детекция аварийных выбросов, оптимизация технологического режима для снижения эмиссии.
Архитектура CEMS с AI
Источники данных:
cems_architecture = {
'analyzers': {
'NOx': 'хемилюминесцентный, диапазон 0-1000 ppm',
'SO2': 'ультрафиолетовый флуоресцентный, 0-2000 ppm',
'CO': 'недисперсионный инфракрасный (NDIR)',
'CO2': 'NDIR, 0-20% объёмных',
'HCl': 'NDIR для хлорорганических производств',
'PM2.5/PM10': 'оптический рассеиватель + бета-абсорбция',
'VOC': 'ПИД (Photoionization Detector)'
},
'flow_meter': {
'type': 'ультразвуковой или термический',
'use': 'расчёт массового потока выбросов (г/с, т/год)'
},
'scada_process': {
'parameters': 'температура, давление, расход сырья, режимы реактора',
'use': 'корреляция выбросов с технологическими параметрами'
}
}
Мониторинг соответствия нормативам
Расчёт нормативных показателей:
import pandas as pd
import numpy as np
def check_regulatory_compliance(emissions_data: pd.DataFrame,
permits: dict,
regulation: str = 'RU_ND') -> dict:
"""
РФ: нормативы ПДВ (Предельно Допустимые Выбросы) по каждому источнику.
ЕС: EU IED (Industrial Emissions Directive) — BAT Associated Emission Levels.
"""
violations = []
for pollutant, permit_value in permits.items():
if pollutant not in emissions_data.columns:
continue
# Мгновенное превышение
instantaneous = emissions_data[pollutant].iloc[-1]
if instantaneous > permit_value:
violations.append({
'pollutant': pollutant,
'type': 'instantaneous_exceedance',
'current_value': round(instantaneous, 3),
'permit': permit_value,
'exceedance_factor': round(instantaneous / permit_value, 2)
})
# Среднесуточное (НОРМАТИВНОЕ ТРЕБОВАНИЕ: не превышать СДВ более 3 дней в году)
daily_avg = emissions_data[pollutant].resample('D').mean()
if len(daily_avg) > 0:
days_exceeded = (daily_avg > permit_value).sum()
if days_exceeded > 0:
violations.append({
'pollutant': pollutant,
'type': 'daily_average_exceeded',
'days_exceeded': int(days_exceeded),
'avg_exceedance': round(daily_avg[daily_avg > permit_value].mean(), 3)
})
# Годовой суммарный выброс
if regulation == 'RU_ND' and 'annual_limit_tonnes' in permits:
annual_actual = emissions_data[pollutant].sum() * 3600 * 1e-6 # г/с → т/год (упрощение)
annual_limit = permits['annual_limit_tonnes'].get(pollutant, float('inf'))
if annual_actual > annual_limit * 0.9:
violations.append({
'pollutant': pollutant,
'type': 'annual_limit_approaching',
'current_tonnes': round(annual_actual, 2),
'annual_limit': annual_limit,
'utilization_pct': round(annual_actual / annual_limit * 100, 1)
})
return {
'compliance': len(violations) == 0,
'violations': violations,
'regulatory_status': 'compliant' if not violations else 'violation'
}
Детекция аварийных выбросов
Аномальный выброс — скорость нарастания:
class EmissionSpikeDetector:
def __init__(self, pollutants: list, ewma_alpha: float = 0.1):
self.baselines = {p: {'mean': None, 'std': None} for p in pollutants}
self.alpha = ewma_alpha
self.history = {p: [] for p in pollutants}
def update_and_detect(self, timestamp, readings: dict) -> dict:
alerts = []
for pollutant, value in readings.items():
if pollutant not in self.baselines:
continue
self.history[pollutant].append(value)
if len(self.history[pollutant]) < 30:
# Накапливаем baseline
if len(self.history[pollutant]) == 30:
self.baselines[pollutant]['mean'] = np.mean(self.history[pollutant])
self.baselines[pollutant]['std'] = np.std(self.history[pollutant])
continue
mean = self.baselines[pollutant]['mean']
std = self.baselines[pollutant]['std']
# Z-score
z = (value - mean) / (std + 1e-9)
# EWMA обновление (медленно, чтобы не подстроиться под аварию)
if abs(z) < 2: # обновляем baseline только в нормальном режиме
self.baselines[pollutant]['mean'] = (
self.alpha * value + (1 - self.alpha) * mean
)
# Скорость нарастания (ROC)
if len(self.history[pollutant]) >= 5:
rate_of_change = (value - self.history[pollutant][-5]) / 4 # за 4 интервала
if abs(z) > 4 or rate_of_change > std * 3:
alerts.append({
'pollutant': pollutant,
'value': value,
'z_score': round(z, 1),
'rate_of_change': round(rate_of_change, 3),
'severity': 'emergency' if abs(z) > 6 else 'alert',
'action': 'emergency_shutdown_check' if abs(z) > 6 else 'investigate_source'
})
return {'timestamp': str(timestamp), 'alerts': alerts, 'healthy': len(alerts) == 0}
Предиктивная модель выбросов
Soft sensor для необслуживаемых источников:
from sklearn.ensemble import GradientBoostingRegressor
def train_emission_prediction_model(process_data: pd.DataFrame,
emission_data: pd.DataFrame,
pollutant: str) -> GradientBoostingRegressor:
"""
Предсказываем выброс по технологическим параметрам.
Использование: 1) мониторинг при отказе анализатора, 2) оптимизация режима.
"""
process_features = [
'reactor_temperature', 'feed_flow_rate', 'pressure',
'oxygen_content', 'fuel_type_encoded',
'load_pct', 'catalyst_activity'
]
combined = process_data.merge(emission_data[['timestamp', pollutant]],
on='timestamp', how='inner')
combined = combined.dropna(subset=process_features + [pollutant])
model = GradientBoostingRegressor(
n_estimators=200,
max_depth=5,
learning_rate=0.05
)
model.fit(combined[process_features], combined[pollutant])
return model # использовать: model.predict([[T, F, P, O2, fuel, load, cat]])
Интеграция: Roshydromet ГИС (публикация данных), Росприроднадзор электронная отчётность, SAP EHS (Environment Health Safety) для инцидентов. Данные CEMS по 17-П приказу Росприроднадзора должны храниться 5 лет.
Сроки: CEMS подключение + compliance проверки + spike detector + дашборд — 3-4 недели. Предиктивная модель выбросов, оптимизация режима для снижения эмиссии, SAP EHS интеграция, регуляторная отчётность — 2-3 месяца.







