Реализация переиндексации данных Elasticsearch без даунтайма
Изменение маппинга поля — самая частая причина переиндексации. Нельзя изменить тип text на keyword на живом индексе. Нельзя добавить новый анализатор к существующему полю. Решение: создать новый индекс, перегнать данные через _reindex API, атомарно переключить алиас. Приложение в течение всего процесса продолжает работать — читает через алиас, который до переключения указывает на старый индекс.
Стратегия blue/green с алиасами
Алиас — абстракция над одним или несколькими индексами. Приложение работает с алиасом products, не зная физического имени индекса.
Начальное состояние:
# Проверить, на что указывает алиас
curl -u elastic:pw "localhost:9200/_alias/products"
# Ответ:
# { "products_v1": { "aliases": { "products": { "is_write_index": true } } } }
Шаг 1 — создать новый индекс с изменённым маппингом:
PUT /products_v2
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0,
"refresh_interval": "-1",
"analysis": {
"analyzer": {
"product_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "russian_stemmer"]
}
}
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"title": {
"type": "text",
"analyzer": "product_analyzer",
"fields": {
"keyword": { "type": "keyword" }
}
},
"price": { "type": "scaled_float", "scaling_factor": 100 },
"new_field": { "type": "keyword" }
}
}
}
number_of_replicas: 0 и refresh_interval: -1 на время переиндексации — ускоряет загрузку данных.
_reindex API
Шаг 2 — запустить переиндексацию:
POST _reindex?wait_for_completion=false
{
"source": {
"index": "products_v1",
"size": 1000
},
"dest": {
"index": "products_v2",
"op_type": "create"
},
"conflicts": "proceed"
}
wait_for_completion=false — задача уходит в background, возвращает task_id. Для больших индексов (>1M документов) обязательно.
op_type: create — пропустить документ, если уже существует (важно для incremental reindex).
conflicts: proceed — при конфликтах версий продолжать, не прерываться.
Мониторинг прогресса:
# По task_id из ответа
curl -u elastic:pw "localhost:9200/_tasks/oTUltX4IQMOUUVeiohTt8A:12345?pretty"
# Все активные reindex задачи
curl -u elastic:pw "localhost:9200/_tasks?actions=*reindex&detailed=true&pretty"
Ответ задачи содержит status.created, status.total — можно считать процент.
Параллельная переиндексация через slices
Для больших индексов — parallel slices ускоряют в N раз:
POST _reindex?wait_for_completion=false
{
"source": {
"index": "products_v1",
"size": 500
},
"dest": {
"index": "products_v2"
},
"slices": "auto"
}
slices: auto — автоматически определяет число срезов (по числу шардов источника). Каждый slice обрабатывается параллельно как отдельная задача. Переиндексация 100M документов с 5 шардами при auto идёт в 5 потоков.
Проблема: новые документы во время переиндексации
Пока идёт reindex, приложение продолжает записывать в products_v1 (через алиас). Новые и обновлённые документы не попадут в products_v2.
Решение — incremental sync после основной переиндексации:
POST _reindex?wait_for_completion=false
{
"source": {
"index": "products_v1",
"query": {
"range": {
"updated_at": {
"gte": "2024-01-15T00:00:00",
"lte": "now"
}
}
}
},
"dest": {
"index": "products_v2",
"op_type": "index",
"version_type": "external"
}
}
version_type: external — использовать _version поле документа для разрешения конфликтов. Старые документы не перезапишут новые.
Для этого в маппинге должно быть поле updated_at с датой обновления. Без него incremental reindex сложен.
Атомарное переключение алиаса
После завершения переиндексации и incremental sync:
# 1. Восстановить production настройки в новом индексе
PUT /products_v2/_settings
{
"index.number_of_replicas": 1,
"index.refresh_interval": "1s"
}
# 2. Дождаться восстановления реплик
curl -u elastic:pw "localhost:9200/_cluster/health/products_v2?wait_for_status=green&timeout=30s"
# 3. Атомарно переключить алиас
POST _aliases
{
"actions": [
{
"add": {
"index": "products_v2",
"alias": "products",
"is_write_index": true
}
},
{
"remove": {
"index": "products_v1",
"alias": "products"
}
}
]
}
Операция атомарная — в момент переключения нет состояния, когда алиас не указывает ни на что. Запросы во время переключения не теряются.
Трансформация данных при переиндексации
Reindex поддерживает Painless скрипты для трансформации:
POST _reindex
{
"source": {
"index": "products_v1"
},
"dest": {
"index": "products_v2"
},
"script": {
"source": """
// Разделить строку 'full_name' на 'first_name' и 'last_name'
if (ctx._source.full_name != null) {
def parts = ctx._source.full_name.splitOnToken(' ');
ctx._source.first_name = parts[0];
ctx._source.last_name = parts.length > 1 ? parts[1] : '';
ctx._source.remove('full_name');
}
// Нормализовать цену из строки в число
if (ctx._source.price instanceof String) {
ctx._source.price = Float.parseFloat(ctx._source.price.replace(',', '.'));
}
""",
"lang": "painless"
}
}
Rollback план
Если после переключения обнаружены проблемы — откат за секунды:
POST _aliases
{
"actions": [
{
"add": {
"index": "products_v1",
"alias": "products",
"is_write_index": true
}
},
{
"remove": {
"index": "products_v2",
"alias": "products"
}
}
]
}
Не удалять products_v1 сразу — держать 24–48 часов для возможности отката. Затем удалить, чтобы освободить место.
Pipeline-переиндексация через ingest
Для обогащения данных при переиндексации — через ingest pipeline:
PUT _ingest/pipeline/enrich-products
{
"processors": [
{
"set": {
"field": "reindexed_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"uppercase": {
"field": "sku",
"ignore_missing": true
}
}
]
}
POST _reindex
{
"source": { "index": "products_v1" },
"dest": {
"index": "products_v2",
"pipeline": "enrich-products"
}
}
Сроки
Переиндексация с простым изменением маппинга — 1 рабочий день (планирование, запуск, мониторинг, переключение). Сложный сценарий с трансформацией данных, инкрементальной синхронизацией и тестированием — 2–3 дня. Для индексов > 100 млн документов — дополнительно время на само выполнение reindex (6–24 часа в зависимости от железа).







