Реализация миграции данных между типами БД (PostgreSQL ↔ MySQL ↔ MongoDB)
Переход между разными СУБД — один из наиболее сложных типов миграций. Различия в типах данных, поведении NULL, автоинкрементах, поддержке JSON и транзакционной семантике требуют тщательного маппинга и тестирования.
Распространённые сценарии
- MySQL → PostgreSQL (выход из vendor lock-in, лучшая поддержка JSON/JSONB, window functions)
- MongoDB → PostgreSQL (нормализация данных, ACID транзакции)
- PostgreSQL → MongoDB (шардирование, гибкая схема для определённых сущностей)
- MySQL → MySQL другой мажорной версии (через dump/restore с трансформацией)
MySQL → PostgreSQL
Инструмент: pgloader
# Установка
apt install pgloader
# Базовая миграция
pgloader mysql://user:pass@mysql-host/myapp \
postgresql://user:pass@pg-host/myapp
pgloader автоматически:
- Конвертирует типы данных
- Мигрирует индексы и первичные ключи
- Переносит внешние ключи
- Обрабатывает
AUTO_INCREMENT→SERIAL/BIGSERIAL
Кастомная конфигурация pgloader:
LOAD DATABASE
FROM mysql://user:pass@mysql-host/myapp
INTO postgresql://user:pass@pg-host/myapp
WITH include no drop,
create tables,
create indexes,
reset sequences
SET work_mem to '256MB',
maintenance_work_mem to '512MB'
CAST type datetime to timestamptz using midnight-in-utc,
type tinyint(1) to boolean using tinyint-to-boolean,
type enum to text,
column orders.status to text
ALTER SCHEMA 'myapp' RENAME TO 'public'
EXCLUDING TABLE NAMES MATCHING 'cache_*', 'sessions'
;
Проблемы при MySQL → PostgreSQL
| Проблема | MySQL | PostgreSQL | Решение |
|---|---|---|---|
| Case sensitivity | user = User |
user ≠ User |
Нормализовать регистр |
| ENUM | Native type | Нет нативного | Конвертировать в text + CHECK |
| Datetime | Нет timezone | timestamptz | Явно указать часовой пояс |
| GROUP BY | Гибкий | Строгий (ONLY_FULL_GROUP_BY) | Переписать запросы |
| Zero values | 0000-00-00 |
Не поддерживается | Конвертировать в NULL |
| Backtick quotes | Допустимы | Не допустимы | Заменить на " |
MongoDB → PostgreSQL
ETL-подход с использованием Python
from pymongo import MongoClient
import psycopg2
from psycopg2.extras import execute_batch
import json
mongo = MongoClient('mongodb://localhost:27017')
pg = psycopg2.connect('host=pg-host dbname=myapp user=app')
# Исходная коллекция MongoDB
source = mongo.myapp.users
cursor = pg.cursor()
batch = []
for doc in source.find():
batch.append((
str(doc['_id']),
doc.get('email'),
doc.get('name'),
json.dumps(doc.get('metadata', {})), # JSONB в PG
doc.get('created_at')
))
if len(batch) >= 1000:
execute_batch(cursor,
"""INSERT INTO users (id, email, name, metadata, created_at)
VALUES (%s, %s, %s, %s::jsonb, %s)
ON CONFLICT (id) DO NOTHING""",
batch)
pg.commit()
batch = []
# Последний батч
if batch:
execute_batch(cursor, query, batch)
pg.commit()
Нормализация вложенных документов
MongoDB-документ:
{
"_id": "user_123",
"email": "[email protected]",
"addresses": [
{ "type": "home", "city": "Moscow", "zip": "101000" },
{ "type": "work", "city": "St. Petersburg", "zip": "190000" }
]
}
PostgreSQL-схема:
CREATE TABLE users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE user_addresses (
id BIGSERIAL PRIMARY KEY,
user_id TEXT REFERENCES users(id),
type TEXT,
city TEXT,
zip TEXT
);
Трансформация при миграции:
for doc in source.find():
cursor.execute("INSERT INTO users (id, email) VALUES (%s, %s)",
(str(doc['_id']), doc['email']))
for addr in doc.get('addresses', []):
cursor.execute(
"INSERT INTO user_addresses (user_id, type, city, zip) VALUES (%s, %s, %s, %s)",
(str(doc['_id']), addr.get('type'), addr.get('city'), addr.get('zip'))
)
PostgreSQL → MySQL
Прямой путь через mysqldump с трансформацией не существует. Используется:
- Экспорт в CSV:
COPY table TO '/tmp/table.csv' CSV HEADER - Создание схемы вручную с учётом типов MySQL
- Импорт:
LOAD DATA INFILE '/tmp/table.csv' INTO TABLE table FIELDS TERMINATED BY ',' ENCLOSED BY '"'
Или через ETL-инструмент Airbyte / dbt / Apache Nifi.
Стратегия zero-downtime при смене типа БД
- Запустить новую БД параллельно со старой
- Написать dual-write слой в приложении: каждая запись идёт в обе БД
- Запустить фоновый процесс синхронизации исторических данных
- После выравнивания — перевести чтение на новую БД
- Через неделю после стабильной работы — отключить dual-write и старую БД
class DualWriteRepository:
def __init__(self, primary, secondary):
self.primary = primary
self.secondary = secondary
def create_user(self, data):
result = self.primary.create_user(data)
try:
self.secondary.create_user(data)
except Exception as e:
logger.error(f"Secondary write failed: {e}")
# Не прерывать запрос — пишем в очередь для retry
queue.put(('create_user', data))
return result
Верификация после миграции
-- Сравнение количества записей
SELECT 'users' as table_name, COUNT(*) FROM users
UNION ALL
SELECT 'orders', COUNT(*) FROM orders
UNION ALL
SELECT 'products', COUNT(*) FROM products;
-- Сравнение контрольных сумм (PostgreSQL)
SELECT md5(array_agg(md5(id::text || email))::text)
FROM (SELECT id, email FROM users ORDER BY id) t;
Срок выполнения
MySQL → PostgreSQL для базы до 50GB — 3–5 рабочих дней. MongoDB → PostgreSQL с нормализацией схемы — 1–2 недели.







