Разработка AI-системы анализа данных IoT-датчиков на производстве
Производственный IoT — сотни и тысячи датчиков температуры, давления, вибрации, тока, скорости — генерирует непрерывный поток данных. ML-система превращает этот поток в производственную разведку: качество продукта в реальном времени, детекция аномалий, оптимизация параметров процесса.
Производственный IoT-стек
Уровни автоматизации (ISA-95):
- Уровень 1: Sensors/Actuators (датчики, исполнительные механизмы)
- Уровень 2: Control (ПЛК, SCADA)
- Уровень 3: MES (Manufacturing Execution System)
- Уровень 4: ERP
ML-аналитика работает на уровнях 2-3, используя данные с уровня 1.
Протоколы:
protocols = {
'OPC-UA': 'стандарт Industry 4.0, discovery + security + историк',
'Modbus RTU/TCP': 'legacy оборудование, простые регистры',
'PROFIBUS/PROFINET': 'Siemens PCS 7, S7 ПЛК',
'EtherNet/IP': 'Allen-Bradley, Rockwell',
'MQTT': 'lightweight для IoT gateway → cloud',
'AMQP': 'корпоративные интеграции'
}
Real-Time Data Pipeline
Потоковая обработка:
from kafka import KafkaConsumer, KafkaProducer
import json
class ManufacturingDataPipeline:
def __init__(self, kafka_bootstrap='kafka:9092'):
self.consumer = KafkaConsumer(
'sensor-raw',
bootstrap_servers=kafka_bootstrap,
value_deserializer=lambda m: json.loads(m.decode()),
group_id='analytics-group'
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap,
value_serializer=lambda v: json.dumps(v).encode()
)
def process_stream(self):
for message in self.consumer:
sensor_data = message.value
# 1. Валидация и очистка
cleaned = self.validate_and_clean(sensor_data)
# 2. Feature extraction (на 1-минутных окнах)
if self.should_extract_features(cleaned):
features = self.extract_features(cleaned)
# 3. Inference
anomaly_score = anomaly_model.predict([features])[0]
quality_prediction = quality_model.predict([features])[0]
# 4. Публикация результатов
self.producer.send('analytics-output', {
'machine_id': cleaned['machine_id'],
'timestamp': cleaned['timestamp'],
'anomaly_score': float(anomaly_score),
'quality_prediction': float(quality_prediction),
'features': features
})
Мультисенсорный анализ производственной линии
Корреляция датчиков:
def analyze_sensor_correlations(sensor_matrix, window_minutes=30):
"""
Корреляционная матрица датчиков:
- Потеря корреляции между коррелированными датчиками = аномалия
- Внезапная корреляция между некоррелированными = нетипичный режим
"""
# Ожидаемые корреляции (из нормального режима работы)
baseline_corr = compute_baseline_correlation(normal_operation_data)
# Текущая корреляция
current_corr = sensor_matrix.corr()
# Отклонение корреляционной структуры
corr_deviation = np.abs(current_corr - baseline_corr).mean().mean()
return corr_deviation # высокое значение = нетипичный режим
Process Variable Interaction:
def detect_process_regime_change(current_state, baseline_pca):
"""
PCA на нормализованных переменных процесса
Выход за пределы "нормального операционного пространства" = аномалия
"""
# Проецируем текущее состояние в пространство PCA
current_pca = baseline_pca.transform([current_state])
# SPE (Squared Prediction Error): ошибка реконструкции
reconstructed = baseline_pca.inverse_transform(current_pca)
spe = np.sum((current_state - reconstructed[0])**2)
# T²: расстояние в PCA пространстве от центра
t2 = np.sum(current_pca**2 / baseline_pca.explained_variance_)
return {'spe': spe, 't2': t2,
'anomaly': spe > spe_ucl or t2 > t2_ucl}
Quality Prediction — Soft Sensor
Онлайн-прогноз качества:
def train_quality_soft_sensor(process_params, lab_results, n_lags=5):
"""
Входные: технологические параметры (онлайн, каждую минуту)
Выходные: качество продукта (лабораторный анализ, каждый час)
Temporal alignment: лаг между параметрами и качеством = время в процессе
"""
# Временные лаги для компенсации времени реакции процесса
lagged_features = create_lagged_features(process_params, n_lags)
# Объединение с лабораторными данными (alignment по времени)
aligned = align_timeseries(lagged_features, lab_results, tolerance='15min')
model = GradientBoostingRegressor(n_estimators=200)
model.fit(aligned[feature_cols], aligned['quality_measure'])
return model
# Использование: каждую минуту → мгновенный прогноз качества
# Не ждать лабораторного анализа через час
Адаптация к дрейфу процесса:
class AdaptiveQualityModel:
"""
Производственные процессы дрейфуют:
- Износ инструмента
- Изменение партии сырья
- Сезонные температурные изменения
Онлайн-обновление модели при получении новых лабораторных данных
"""
def __init__(self, base_model, update_rate=0.1):
self.model = base_model
self.update_rate = update_rate
self.recent_samples = deque(maxlen=200)
def predict(self, features):
return self.model.predict([features])[0]
def update(self, features, true_quality):
self.recent_samples.append((features, true_quality))
if len(self.recent_samples) % 10 == 0: # перобучение каждые 10 новых точек
X_recent = [s[0] for s in self.recent_samples]
y_recent = [s[1] for s in self.recent_samples]
# Incremental fit с высоким весом новых данных
self.model.fit(X_recent, y_recent)
Anomaly Prioritization
Многоуровневые оповещения:
def prioritize_manufacturing_alerts(anomalies, asset_criticality, production_impact):
"""
Не все аномалии одинаково важны
Приоритет = аномальность × критичность актива × текущая загрузка
"""
scored_alerts = []
for anomaly in anomalies:
priority_score = (
anomaly['severity'] *
asset_criticality[anomaly['machine_id']] *
production_impact.get(anomaly['machine_id'], 1.0)
)
scored_alerts.append({**anomaly, 'priority': priority_score})
return sorted(scored_alerts, key=lambda x: x['priority'], reverse=True)
Suppression коррелированных алертов: Одна поломка насоса → сотня алертов давления, температуры, расхода по всему трубопроводу. Root cause suppression: определить первичный алерт, сгруппировать остальные.
Интеграция с SCADA и MES
Historian Integration (OSIsoft PI):
from osisoft.pidevclub.piwebapi.api import DataApi
def read_pi_data(pi_server_url, tag_names, start_time, end_time):
"""
PI System: стандарт в нефтегазе, энергетике, хим. промышленности
PIWebAPI → REST доступ к историческим и real-time данным
"""
# Чтение данных тегов из PI historian
pass
# Также: OSIsoft AF (Asset Framework) для контекстной информации
MES Reporting: ML-система публикует агрегированные KPI в MES:
- OEE (Overall Equipment Effectiveness) по каждой линии
- Quality rate: доля продукции в норме vs. rejected
- Performance: фактическая vs. плановая производительность
Сроки: OPC-UA/Modbus коллектор, Kafka pipeline, мультисенсорная аномалия, дашборд — 4-5 недель. Soft sensor качества, adaptive model, process regime detection, MES интеграция, PI historian — 3-4 месяца.







