Разработка AI для стабилизации видео
Дрожание камеры — неизбежный артефакт съёмки с рук, дронов, спортивных камер. Классическая стабилизация работает через оптический поток: оценить движение между кадрами, сглядить траекторию, скомпенсировать дрожание. AI-методы добавляют семантическое понимание: отличают движение оператора от движения объекта съёмки, лучше обрабатывают динамические сцены, могут восстанавливать «выкадрированные» пиксели через inpainting.
Классическая стабилизация через optical flow
import cv2
import numpy as np
from scipy.signal import medfilt
class VideoStabilizer:
def __init__(self, smoothing_window: int = 30,
crop_ratio: float = 0.1):
self.smoothing_window = smoothing_window
self.crop_ratio = crop_ratio # обрезка краёв после стабилизации
def stabilize(self, input_path: str, output_path: str) -> dict:
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Шаг 1: Вычислить траекторию камеры
transforms = self._estimate_transforms(cap)
cap.release()
# Шаг 2: Сглаживание траектории
smoothed = self._smooth_trajectory(transforms)
# Шаг 3: Применение стабилизирующих трансформаций
cap = cv2.VideoCapture(input_path)
out = cv2.VideoWriter(output_path,
cv2.VideoWriter_fourcc(*'mp4v'),
fps, (w, h))
for i, (orig, smooth) in enumerate(zip(transforms, smoothed)):
ret, frame = cap.read()
if not ret:
break
stabilized = self._apply_transform(frame, orig, smooth, w, h)
out.write(stabilized)
cap.release()
out.release()
return {'frames': len(transforms), 'smoothing_window': self.smoothing_window}
def _estimate_transforms(self, cap) -> list[np.ndarray]:
"""Оценка аффинных трансформаций между соседними кадрами"""
ret, prev = cap.read()
prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
transforms = []
while True:
ret, curr = cap.read()
if not ret:
break
curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
# Детекция и трекинг точек
prev_pts = cv2.goodFeaturesToTrack(
prev_gray, maxCorners=200, qualityLevel=0.01,
minDistance=30, blockSize=3
)
curr_pts, status, _ = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_pts, None
)
# Фильтрация надёжных точек
valid_prev = prev_pts[status == 1]
valid_curr = curr_pts[status == 1]
# Оценка аффинной трансформации
m, _ = cv2.estimateAffinePartial2D(valid_prev, valid_curr)
if m is None:
m = np.eye(2, 3, dtype=np.float64)
transforms.append(m)
prev_gray = curr_gray
return transforms
def _smooth_trajectory(self, transforms: list) -> list:
"""Скользящее среднее для сглаживания траектории"""
trajectory = np.cumsum([m[:, 2] for m in transforms], axis=0)
smoothed = np.zeros_like(trajectory)
for i in range(len(trajectory)):
start = max(0, i - self.smoothing_window // 2)
end = min(len(trajectory), i + self.smoothing_window // 2)
smoothed[i] = trajectory[start:end].mean(axis=0)
# Дельта трансформаций для применения
delta = smoothed - trajectory
result = []
for i, m in enumerate(transforms):
m_smooth = m.copy()
m_smooth[:, 2] += delta[i]
result.append(m_smooth)
return result
def _apply_transform(self, frame: np.ndarray,
orig_m: np.ndarray,
smooth_m: np.ndarray,
w: int, h: int) -> np.ndarray:
stabilized = cv2.warpAffine(frame, smooth_m, (w, h))
# Кроп для скрытия чёрных краёв
crop = int(min(w, h) * self.crop_ratio)
stabilized = stabilized[crop:h-crop, crop:w-crop]
return cv2.resize(stabilized, (w, h))
DUT — Deep Unified Transformer для AI-стабилизации
class DeepVideoStabilizer:
"""
AI-подход: обучение стабилизировать видео на парах нестабильный/стабильный.
Преимущество перед классикой: лучше обрабатывает
rolling shutter, быстрое движение, размытие.
"""
def __init__(self, checkpoint_path: str, device: str = 'cuda'):
import sys
sys.path.append('/opt/DUT')
from model import DUTStabilizer
self.model = DUTStabilizer()
self.model.load_state_dict(torch.load(checkpoint_path))
self.model.eval().to(device)
self.device = device
@torch.no_grad()
def stabilize_clip(self, frames: list[np.ndarray],
window_size: int = 16) -> list[np.ndarray]:
"""
Обрабатывает видео окнами по window_size кадров.
Ключевая особенность DUT: использует future frames
для предсказания текущей стабилизации.
"""
results = []
for i in range(0, len(frames), window_size // 2):
window = frames[i:i+window_size]
if len(window) < window_size:
# Дополнить последним кадром
window = window + [window[-1]] * (window_size - len(window))
tensor = self._frames_to_tensor(window)
stabilized_tensor = self.model(tensor.to(self.device))
stabilized_frames = self._tensor_to_frames(stabilized_tensor)
results.extend(stabilized_frames[:window_size//2])
return results[:len(frames)]
Метрики качества стабилизации
def evaluate_stabilization(unstable_frames: list, stable_frames: list) -> dict:
"""
Метрики:
- Cropping Ratio: сколько пикселей сохранено (выше = лучше)
- Distortion Value: искажение геометрии (ниже = лучше)
- Stability Score: дисперсия движения между кадрами (ниже = лучше)
"""
# Stability: дисперсия optical flow в стабилизированном видео
flows = []
for i in range(1, len(stable_frames)):
prev_gray = cv2.cvtColor(stable_frames[i-1], cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(stable_frames[i], cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None,
0.5, 3, 15, 3, 5, 1.2, 0)
flows.append(np.abs(flow).mean())
return {
'stability_score': float(np.std(flows)),
'mean_motion': float(np.mean(flows)),
'max_motion': float(np.max(flows))
}
| Метод |
Stability↓ |
Cropping↑ |
Скорость |
| OpenCV (vidstab) |
0.35 |
0.91 |
Realtime |
| DIFRINT |
0.18 |
0.89 |
5–10 FPS |
| DUT |
0.14 |
0.87 |
3–5 FPS |
| StabNet |
0.16 |
0.90 |
8 FPS |
| Задача |
Срок |
| Batch стабилизация через OpenCV |
1 неделя |
| AI стабилизация с DUT/DIFRINT |
4–6 недель |
| Realtime стабилизация для трансляций |
6–10 недель |