Настройка Kafka Connect для интеграции с базами данных
Kafka Connect — это фреймворк для потоковой репликации данных между Kafka и внешними системами без написания кастомного кода. Два типа коннекторов: Source (данные идут в Kafka) и Sink (данные идут из Kafka в хранилище).
Практический кейс: CDC (Change Data Capture) из PostgreSQL через Debezium → Kafka → ElasticSearch для поискового индекса. При изменении строки в БД поиск обновляется за секунды без запросов к PostgreSQL.
Установка Kafka Connect
Kafka Connect входит в дистрибутив Kafka, но требует отдельного запуска в distributed-режиме:
# Конфигурация distributed-режима
# /opt/kafka/config/connect-distributed.properties
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
# Внутренние топики для хранения конфигурации коннекторов
group.id=kafka-connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
offset.flush.interval.ms=10000
# REST API для управления
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=connect-1.internal
rest.advertised.port=8083
# Плагины (скачанные коннекторы)
plugin.path=/opt/kafka/plugins
# Конвертеры — Avro с Schema Registry
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# Для простых случаев без Schema Registry:
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=true
Systemd unit:
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Debezium PostgreSQL Source Connector
Debezium перехватывает WAL (Write-Ahead Log) PostgreSQL и транслирует каждую INSERT/UPDATE/DELETE в Kafka-событие.
Предварительная настройка PostgreSQL:
-- postgresql.conf
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Создаём пользователя для репликации
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
-- Создаём publication для нужных таблиц
CREATE PUBLICATION debezium_pub FOR TABLE products, orders, users, categories;
Конфигурация коннектора через REST API:
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secure_password",
"database.dbname": "myapp",
"database.server.name": "myapp-pg",
"topic.prefix": "myapp",
"table.include.list": "public.products,public.orders,public.users",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "read_committed",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"heartbeat.interval.ms": "10000",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
После запуска топики создаются автоматически: myapp.public.products, myapp.public.orders.
JDBC Sink Connector — из Kafka в PostgreSQL
Обратный случай: события из Kafka пишем в PostgreSQL (аналитическая БД, Data Warehouse).
# Скачиваем JDBC Connector
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar \
-O /opt/kafka/plugins/kafka-connect-jdbc.jar
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "4",
"topics": "myapp.analytics.events",
"connection.url": "jdbc:postgresql://analytics-pg:5432/analytics",
"connection.user": "kafka_writer",
"connection.password": "secure_password",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"table.name.format": "analytics.${topic}",
"batch.size": "1000",
"db.timezone": "UTC",
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPrefix.exclude": "__deleted,__op,__ts_ms"
}
}'
Elasticsearch Sink Connector
Синхронизация продуктового каталога из Kafka в Elasticsearch:
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "4",
"topics": "myapp.public.products",
"connection.url": "https://es-node-1:9200,https://es-node-2:9200",
"connection.username": "elastic",
"connection.password": "elastic_pass",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"batch.size": "500",
"flush.timeout.ms": "10000",
"max.retries": "5",
"retry.backoff.ms": "100",
"linger.ms": "1000",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id"
}
}'
Управление и мониторинг
# Список коннекторов
curl http://connect-1:8083/connectors | jq .
# Статус коннектора
curl http://connect-1:8083/connectors/postgres-source-connector/status | jq .
# Перезапуск упавшего task'а
curl -X POST http://connect-1:8083/connectors/postgres-source-connector/tasks/0/restart
# Пауза/возобновление
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/pause
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/resume
# Обновление конфигурации
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/config \
-H "Content-Type: application/json" \
-d '{"heartbeat.interval.ms": "5000", ...}'
Prometheus JMX-метрики через JMX Exporter:
KAFKA_OPTS="-javaagent:/opt/jmx-exporter.jar=9404:/opt/kafka/config/jmx-connect.yml" \
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Типовые проблемы
WAL bloat — Debezium не может отстать от PostgreSQL: если слот репликации не сдвигается, WAL накапливается. Настраиваем max_slot_wal_keep_size в PostgreSQL и алерт на размер WAL.
Schema evolution — при добавлении нового колонки в PostgreSQL Debezium автоматически обновит схему в Schema Registry. Sink-коннектор должен быть готов к новым полям (auto.evolve=true или ручное управление).
Tombstone messages — при DELETE Debezium отправляет два сообщения: событие DELETE и tombstone (null value). Для compact-топиков tombstone используется для удаления записи из лога.
Таймлайн
День 1 — настройка PostgreSQL для логической репликации, установка Kafka Connect в distributed-режиме на 2–3 узла.
День 2 — установка Debezium, первоначальный snapshot (может занять часы для больших таблиц), настройка коннектора, верификация CDC-событий.
День 3 — настройка Sink-коннектора (ES или PostgreSQL), трансформации через SMT (Single Message Transform), тестирование полного пайплайна INSERT/UPDATE/DELETE.
День 4 — мониторинг, алерты на лаг и ошибки, документация схемы топиков, нагрузочное тестирование с пиковым потоком изменений.







