Настройка топиков и партиций Kafka
Количество партиций и фактор репликации — два параметра, которые нельзя легко изменить после создания топика. Уменьшить количество партиций невозможно без полного пересоздания топика. Поэтому правильная настройка при создании важна.
Как работают партиции
Партиция — единица параллелизма в Kafka. Один консьюмер в группе обрабатывает одну партицию. Если топик имеет 6 партиций, максимум 6 консьюмеров в группе могут читать параллельно. Лишние консьюмеры простаивают.
Запись в партицию строго упорядочена. Глобальный порядок по топику не гарантируется — только внутри партиции. Это важно для событий, которые должны обрабатываться последовательно (все действия одного пользователя).
Topic: user-events (6 партиций)
Partition 0: event1(user:101), event4(user:205), ...
Partition 1: event2(user:102), event5(user:101), ... ← user:101 разбит по партициям!
Partition 2: event3(user:103), ...
Чтобы события одного пользователя шли в одну партицию — используем ключ сообщения:
Запись с ключом user_id → hash(user_id) % num_partitions → всегда одна партиция
Расчёт количества партиций
Практическое правило: num_partitions = max(throughput_target / throughput_per_partition, num_consumers_target).
Типичная пропускная способность одной партиции: 10–50 MB/s для записи (зависит от железа и конфигурации брокера).
Пример: нужно обрабатывать 200 MB/s с пиком до 400 MB/s и держать возможность масштабировать до 20 консьюмеров → берём 24 партиции (кратно 6, 8, 12 для удобного масштабирования).
Слишком много партиций — тоже плохо: каждая партиция требует filehandle, память для буферов, нагружает контроллер при выборах лидера.
Создание топиков через kafka-topics.sh
# Базовый топик для событий пользователей
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--create \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config retention.bytes=10737418240 \
--config compression.type=lz4 \
--config min.insync.replicas=2 \
--config message.max.bytes=1048576
# Компактный топик — для хранения последнего состояния по ключу
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--create \
--topic user-profiles \
--partitions 24 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1 \
--config segment.ms=3600000 \
--config delete.retention.ms=86400000
# Высокоприоритетная очередь с коротким retention
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--create \
--topic order-processing-priority \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=3600000 \
--config max.message.bytes=102400
Управление через Admin API (Java/Kotlin)
Создание топиков программно — правильно для приложений, которые создают топики динамически:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
try (AdminClient admin = AdminClient.create(props)) {
NewTopic userEvents = new NewTopic("user-events", 12, (short) 3);
userEvents.configs(Map.of(
"retention.ms", "604800000",
"compression.type", "lz4",
"min.insync.replicas", "2"
));
NewTopic deadLetter = new NewTopic("user-events-dlq", 3, (short) 3);
deadLetter.configs(Map.of(
"retention.ms", "2592000000", // 30 дней
"retention.bytes", "-1"
));
CreateTopicsResult result = admin.createTopics(List.of(userEvents, deadLetter));
result.all().get(30, TimeUnit.SECONDS);
}
Изменение конфигурации существующего топика
# Увеличиваем retention
kafka-configs.sh --bootstrap-server kafka-1:9092 \
--alter \
--entity-type topics \
--entity-name user-events \
--add-config retention.ms=1209600000
# Добавляем партиции (только увеличение!)
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--alter \
--topic user-events \
--partitions 24
# Внимание: добавление партиций нарушает порядок для ключевых сообщений.
# Существующие ключи пойдут в те же партиции (hash % 12),
# новые ключи будут распределяться по 24 партициям.
# Просмотр конфигурации топика
kafka-configs.sh --bootstrap-server kafka-1:9092 \
--describe \
--entity-type topics \
--entity-name user-events
Управление лидерами партиций
Неравномерное распределение лидеров между брокерами приводит к горячим узлам:
# Проверяем распределение лидеров
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--describe --topic user-events
# Предпочтительные реплики — перебалансировка лидеров
kafka-leader-election.sh --bootstrap-server kafka-1:9092 \
--election-type preferred \
--all-topic-partitions
# Или для конкретного топика через JSON
cat > election.json << 'EOF'
{
"partitions": [
{"topic": "user-events", "partition": 0},
{"topic": "user-events", "partition": 1}
]
}
EOF
kafka-leader-election.sh --bootstrap-server kafka-1:9092 \
--election-type preferred \
--path-to-json-file election.json
Мониторинг партиций
# Consumer lag — отставание группы
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
--describe --group my-consumer-group
# Вывод: TOPIC / PARTITION / CURRENT-OFFSET / LOG-END-OFFSET / LAG
# Суммарный lag > 10000 для критичных топиков — повод для алерта
# Offset reset (если нужно перечитать с начала)
kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
--group my-consumer-group \
--topic user-events \
--reset-offsets --to-earliest \
--execute
Типовые конфигурации по типу данных
| Топик | Партиции | Replication | Cleanup | Retention |
|---|---|---|---|---|
| Транзакции | 12–24 | 3 (min.isr=2) | delete | 7–30 дней |
| Аудит-лог | 6–12 | 3 (min.isr=2) | delete | 90–365 дней |
| Профили (CDC) | 24–48 | 3 | compact | без ограничений |
| Метрики | 12 | 2 | delete | 24–48 часов |
| Уведомления | 6 | 3 | delete | 1–3 дня |
Таймлайн
День 1 — анализ требований: throughput, количество консьюмеров, требования к упорядоченности, retention. Проектирование схемы топиков.
День 2 — создание топиков, настройка ACL (если включена аутентификация), тестирование producer/consumer с правильными ключами.
День 3 — настройка мониторинга consumer lag, алерты, документирование схемы для команды.







