Настройка Kafka Connect для интеграции с базами данных

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка Kafka Connect для интеграции с базами данных
Сложная
~3-5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Kafka Connect для интеграции с базами данных

Kafka Connect — это фреймворк для потоковой репликации данных между Kafka и внешними системами без написания кастомного кода. Два типа коннекторов: Source (данные идут в Kafka) и Sink (данные идут из Kafka в хранилище).

Практический кейс: CDC (Change Data Capture) из PostgreSQL через Debezium → Kafka → ElasticSearch для поискового индекса. При изменении строки в БД поиск обновляется за секунды без запросов к PostgreSQL.

Установка Kafka Connect

Kafka Connect входит в дистрибутив Kafka, но требует отдельного запуска в distributed-режиме:

# Конфигурация distributed-режима
# /opt/kafka/config/connect-distributed.properties

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Внутренние топики для хранения конфигурации коннекторов
group.id=kafka-connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
offset.flush.interval.ms=10000

# REST API для управления
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=connect-1.internal
rest.advertised.port=8083

# Плагины (скачанные коннекторы)
plugin.path=/opt/kafka/plugins

# Конвертеры — Avro с Schema Registry
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# Для простых случаев без Schema Registry:
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=true

Systemd unit:

/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Debezium PostgreSQL Source Connector

Debezium перехватывает WAL (Write-Ahead Log) PostgreSQL и транслирует каждую INSERT/UPDATE/DELETE в Kafka-событие.

Предварительная настройка PostgreSQL:

-- postgresql.conf
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

-- Создаём пользователя для репликации
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

-- Создаём publication для нужных таблиц
CREATE PUBLICATION debezium_pub FOR TABLE products, orders, users, categories;

Конфигурация коннектора через REST API:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres.internal",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secure_password",
        "database.dbname": "myapp",
        "database.server.name": "myapp-pg",
        "topic.prefix": "myapp",
        "table.include.list": "public.products,public.orders,public.users",
        "plugin.name": "pgoutput",
        "publication.name": "debezium_pub",
        "slot.name": "debezium_slot",
        "snapshot.mode": "initial",
        "snapshot.isolation.mode": "read_committed",
        "decimal.handling.mode": "double",
        "time.precision.mode": "connect",
        "tombstones.on.delete": "true",
        "heartbeat.interval.ms": "10000",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}'

После запуска топики создаются автоматически: myapp.public.products, myapp.public.orders.

JDBC Sink Connector — из Kafka в PostgreSQL

Обратный случай: события из Kafka пишем в PostgreSQL (аналитическая БД, Data Warehouse).

# Скачиваем JDBC Connector
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar \
    -O /opt/kafka/plugins/kafka-connect-jdbc.jar

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.analytics.events",
        "connection.url": "jdbc:postgresql://analytics-pg:5432/analytics",
        "connection.user": "kafka_writer",
        "connection.password": "secure_password",
        "auto.create": "false",
        "auto.evolve": "false",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "table.name.format": "analytics.${topic}",
        "batch.size": "1000",
        "db.timezone": "UTC",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropPrefix.exclude": "__deleted,__op,__ts_ms"
    }
}'

Elasticsearch Sink Connector

Синхронизация продуктового каталога из Kafka в Elasticsearch:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.public.products",
        "connection.url": "https://es-node-1:9200,https://es-node-2:9200",
        "connection.username": "elastic",
        "connection.password": "elastic_pass",
        "type.name": "_doc",
        "key.ignore": "false",
        "schema.ignore": "true",
        "behavior.on.null.values": "delete",
        "batch.size": "500",
        "flush.timeout.ms": "10000",
        "max.retries": "5",
        "retry.backoff.ms": "100",
        "linger.ms": "1000",
        "transforms": "extractKey",
        "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field": "id"
    }
}'

Управление и мониторинг

# Список коннекторов
curl http://connect-1:8083/connectors | jq .

# Статус коннектора
curl http://connect-1:8083/connectors/postgres-source-connector/status | jq .

# Перезапуск упавшего task'а
curl -X POST http://connect-1:8083/connectors/postgres-source-connector/tasks/0/restart

# Пауза/возобновление
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/pause
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/resume

# Обновление конфигурации
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/config \
  -H "Content-Type: application/json" \
  -d '{"heartbeat.interval.ms": "5000", ...}'

Prometheus JMX-метрики через JMX Exporter:

KAFKA_OPTS="-javaagent:/opt/jmx-exporter.jar=9404:/opt/kafka/config/jmx-connect.yml" \
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Типовые проблемы

WAL bloat — Debezium не может отстать от PostgreSQL: если слот репликации не сдвигается, WAL накапливается. Настраиваем max_slot_wal_keep_size в PostgreSQL и алерт на размер WAL.

Schema evolution — при добавлении нового колонки в PostgreSQL Debezium автоматически обновит схему в Schema Registry. Sink-коннектор должен быть готов к новым полям (auto.evolve=true или ручное управление).

Tombstone messages — при DELETE Debezium отправляет два сообщения: событие DELETE и tombstone (null value). Для compact-топиков tombstone используется для удаления записи из лога.

Таймлайн

День 1 — настройка PostgreSQL для логической репликации, установка Kafka Connect в distributed-режиме на 2–3 узла.

День 2 — установка Debezium, первоначальный snapshot (может занять часы для больших таблиц), настройка коннектора, верификация CDC-событий.

День 3 — настройка Sink-коннектора (ES или PostgreSQL), трансформации через SMT (Single Message Transform), тестирование полного пайплайна INSERT/UPDATE/DELETE.

День 4 — мониторинг, алерты на лаг и ошибки, документация схемы топиков, нагрузочное тестирование с пиковым потоком изменений.