Реализация инкрементального импорта товаров (только изменения)

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.

Разработка и обслуживание любых видов сайтов:

Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация инкрементального импорта товаров (только изменения)
Средняя
~3-5 рабочих дней
Часто задаваемые вопросы

Наши компетенции:

Этапы разработки

Последние работы

  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    874
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    851

Реализация инкрементального импорта товаров (только изменения)

Полная перезагрузка каталога каждый раз — расточительно. При 100 000 позиций за час изменяется 1–5% данных, а система обрабатывает все 100 000. Инкрементальный импорт решает это: передаются и обрабатываются только те товары, которые реально изменились.

Стратегии определения изменений

1. Временная метка updated_at

Поставщик поддерживает фильтр по дате изменения — самый распространённый подход:

GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200

Система запоминает время последней успешной синхронизации и передаёт его при следующем запросе.

2. Курсор / change log

Поставщик ведёт лог изменений с возрастающим ID:

GET /api/changes?since_id=48291&limit=500

Более надёжно, чем timestamp: не пропускает изменения, произошедшие во время обработки.

3. Хэш-сравнение

Когда источник не поддерживает фильтрацию по изменениям — сравниваем хэш строки:

$hash = md5(serialize([
    $row['price'], $row['qty'], $row['name'], $row['description']
]));

Строка обрабатывается только если хэш изменился.

4. Diff файлов

Поставщик публикует ежечасный diff-файл вместо полного прайса:

<changes>
  <updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
  <updated id="SKU-456"><qty>0</qty></updated>
  <deleted id="SKU-789"/>
  <created id="SKU-999"><!-- полные данные --></created>
</changes>

Реализация State Tracker

Состояние синхронизации хранится в БД:

CREATE TABLE import_sync_state (
    source_id       int PRIMARY KEY REFERENCES import_sources(id),
    last_sync_at    timestamptz,
    last_cursor     varchar(200),   -- для cursor-based
    last_change_id  bigint,         -- для changelog-based
    items_synced    bigint DEFAULT 0,
    updated_at      timestamptz DEFAULT now()
);
class SyncStateManager
{
    public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
    {
        return ImportSyncState::find($sourceId)?->last_sync_at;
    }

    public function markSyncStarted(int $sourceId): void
    {
        // Запоминаем время START синхронизации, не END
        // Это критично: за время обработки могут появиться новые изменения
        Cache::put("sync_start_{$sourceId}", now(), 3600);
    }

    public function markSyncCompleted(int $sourceId): void
    {
        ImportSyncState::updateOrCreate(
            ['source_id' => $sourceId],
            ['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
        );
    }
}

Важно: фиксируем время начала синхронизации, а не конца. Если за время обработки появились новые изменения — они попадут в следующий цикл.

Pipeline инкрементального импорта

class IncrementalImportJob implements ShouldQueue
{
    public function handle(
        SyncStateManager       $state,
        SupplierApiClient      $client,
        IncrementalProductSync $sync,
    ): void {
        $since = $state->getLastSyncAt($this->sourceId);
        $state->markSyncStarted($this->sourceId);

        $stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];

        foreach ($client->fetchUpdatedSince($since) as $item) {
            $result = $sync->process($item, $this->sourceId);
            $stats[$result]++;
        }

        $state->markSyncCompleted($this->sourceId);
        $this->logResult($stats);
    }
}

Определение типа изменения

class IncrementalProductSync
{
    public function process(array $item, int $sourceId): string
    {
        // Удалённые позиции
        if ($item['deleted'] ?? false) {
            Product::where('sku', $item['sku'])
                ->where('source_id', $sourceId)
                ->update(['deleted_at' => now()]);
            return 'deleted';
        }

        $product = Product::where('sku', $item['sku'])
            ->where('source_id', $sourceId)
            ->first();

        if (!$product) {
            // Новый товар
            Product::create($this->buildProductData($item, $sourceId));
            return 'created';
        }

        // Проверяем хэш — обновлять только если что-то изменилось
        $newHash = $this->computeHash($item);
        if ($product->content_hash === $newHash) {
            return 'skipped';
        }

        $product->update($this->buildProductData($item, $sourceId) + [
            'content_hash' => $newHash,
        ]);
        return 'updated';
    }

    private function computeHash(array $item): string
    {
        return md5(json_encode([
            $item['price'] ?? null,
            $item['qty']   ?? null,
            $item['name']  ?? null,
        ]));
    }
}

Хэш-сравнение при отсутствии delta-API

Когда источник не поддерживает фильтрацию — скачиваем полный файл, но обрабатываем только изменения:

class HashBasedDeltaProcessor
{
    public function process(iterable $allItems, int $sourceId): DeltaResult
    {
        // Загружаем все текущие хэши из БД (один запрос)
        $storedHashes = Product::where('source_id', $sourceId)
            ->pluck('content_hash', 'sku')
            ->all();

        $toCreate = $toUpdate = $unchanged = 0;
        $createBatch = $updateBatch = [];

        foreach ($allItems as $item) {
            $newHash = $this->computeHash($item);
            $sku     = $item['sku'];

            if (!isset($storedHashes[$sku])) {
                $createBatch[] = $item;
                $toCreate++;
            } elseif ($storedHashes[$sku] !== $newHash) {
                $updateBatch[] = $item;
                $toUpdate++;
            } else {
                $unchanged++;
            }
        }

        // Batch insert/update только изменённых
        if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
        if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);

        return new DeltaResult($toCreate, $toUpdate, $unchanged);
    }
}

Обнаружение удалённых позиций

Если источник не присылает явных сигналов об удалении — используем «anti-join» подход:

public function detectDeleted(array $currentSkus, int $sourceId): int
{
    // SKU, которые были в прошлом импорте, но отсутствуют в текущем
    $deletedCount = Product::where('source_id', $sourceId)
        ->whereNotIn('sku', $currentSkus)
        ->whereNull('deleted_at')
        ->update(['deleted_at' => now()]);

    return $deletedCount;
}

Для 100 000 SKU запрос WHERE sku NOT IN (...) неэффективен. Лучше временная таблица:

CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- загружаем все SKU текущего импорта

UPDATE products
SET deleted_at = now()
WHERE source_id = $1
  AND deleted_at IS NULL
  AND sku NOT IN (SELECT sku FROM current_import_skus);

DROP TABLE current_import_skus;

Конкурентность: защита от двойного запуска

public function handle(): void
{
    $lock = Cache::lock("import_sync_{$this->sourceId}", 3600);

    if (!$lock->get()) {
        Log::info("Import for source {$this->sourceId} already running, skipping");
        return;
    }

    try {
        $this->runSync();
    } finally {
        $lock->release();
    }
}

Сроки реализации

  • Timestamp-based инкремент, state manager, hash-сравнение — 2 дня
  • Обнаружение удалённых позиций (temp table), lock против двойного запуска — +1 день
  • Cursor-based синхронизация + diff-файлы + отчётность по delta — +1–2 дня