Реализация инкрементального импорта товаров (только изменения)
Полная перезагрузка каталога каждый раз — расточительно. При 100 000 позиций за час изменяется 1–5% данных, а система обрабатывает все 100 000. Инкрементальный импорт решает это: передаются и обрабатываются только те товары, которые реально изменились.
Стратегии определения изменений
1. Временная метка updated_at
Поставщик поддерживает фильтр по дате изменения — самый распространённый подход:
GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200
Система запоминает время последней успешной синхронизации и передаёт его при следующем запросе.
2. Курсор / change log
Поставщик ведёт лог изменений с возрастающим ID:
GET /api/changes?since_id=48291&limit=500
Более надёжно, чем timestamp: не пропускает изменения, произошедшие во время обработки.
3. Хэш-сравнение
Когда источник не поддерживает фильтрацию по изменениям — сравниваем хэш строки:
$hash = md5(serialize([
$row['price'], $row['qty'], $row['name'], $row['description']
]));
Строка обрабатывается только если хэш изменился.
4. Diff файлов
Поставщик публикует ежечасный diff-файл вместо полного прайса:
<changes>
<updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
<updated id="SKU-456"><qty>0</qty></updated>
<deleted id="SKU-789"/>
<created id="SKU-999"><!-- полные данные --></created>
</changes>
Реализация State Tracker
Состояние синхронизации хранится в БД:
CREATE TABLE import_sync_state (
source_id int PRIMARY KEY REFERENCES import_sources(id),
last_sync_at timestamptz,
last_cursor varchar(200), -- для cursor-based
last_change_id bigint, -- для changelog-based
items_synced bigint DEFAULT 0,
updated_at timestamptz DEFAULT now()
);
class SyncStateManager
{
public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
{
return ImportSyncState::find($sourceId)?->last_sync_at;
}
public function markSyncStarted(int $sourceId): void
{
// Запоминаем время START синхронизации, не END
// Это критично: за время обработки могут появиться новые изменения
Cache::put("sync_start_{$sourceId}", now(), 3600);
}
public function markSyncCompleted(int $sourceId): void
{
ImportSyncState::updateOrCreate(
['source_id' => $sourceId],
['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
);
}
}
Важно: фиксируем время начала синхронизации, а не конца. Если за время обработки появились новые изменения — они попадут в следующий цикл.
Pipeline инкрементального импорта
class IncrementalImportJob implements ShouldQueue
{
public function handle(
SyncStateManager $state,
SupplierApiClient $client,
IncrementalProductSync $sync,
): void {
$since = $state->getLastSyncAt($this->sourceId);
$state->markSyncStarted($this->sourceId);
$stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];
foreach ($client->fetchUpdatedSince($since) as $item) {
$result = $sync->process($item, $this->sourceId);
$stats[$result]++;
}
$state->markSyncCompleted($this->sourceId);
$this->logResult($stats);
}
}
Определение типа изменения
class IncrementalProductSync
{
public function process(array $item, int $sourceId): string
{
// Удалённые позиции
if ($item['deleted'] ?? false) {
Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->update(['deleted_at' => now()]);
return 'deleted';
}
$product = Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->first();
if (!$product) {
// Новый товар
Product::create($this->buildProductData($item, $sourceId));
return 'created';
}
// Проверяем хэш — обновлять только если что-то изменилось
$newHash = $this->computeHash($item);
if ($product->content_hash === $newHash) {
return 'skipped';
}
$product->update($this->buildProductData($item, $sourceId) + [
'content_hash' => $newHash,
]);
return 'updated';
}
private function computeHash(array $item): string
{
return md5(json_encode([
$item['price'] ?? null,
$item['qty'] ?? null,
$item['name'] ?? null,
]));
}
}
Хэш-сравнение при отсутствии delta-API
Когда источник не поддерживает фильтрацию — скачиваем полный файл, но обрабатываем только изменения:
class HashBasedDeltaProcessor
{
public function process(iterable $allItems, int $sourceId): DeltaResult
{
// Загружаем все текущие хэши из БД (один запрос)
$storedHashes = Product::where('source_id', $sourceId)
->pluck('content_hash', 'sku')
->all();
$toCreate = $toUpdate = $unchanged = 0;
$createBatch = $updateBatch = [];
foreach ($allItems as $item) {
$newHash = $this->computeHash($item);
$sku = $item['sku'];
if (!isset($storedHashes[$sku])) {
$createBatch[] = $item;
$toCreate++;
} elseif ($storedHashes[$sku] !== $newHash) {
$updateBatch[] = $item;
$toUpdate++;
} else {
$unchanged++;
}
}
// Batch insert/update только изменённых
if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);
return new DeltaResult($toCreate, $toUpdate, $unchanged);
}
}
Обнаружение удалённых позиций
Если источник не присылает явных сигналов об удалении — используем «anti-join» подход:
public function detectDeleted(array $currentSkus, int $sourceId): int
{
// SKU, которые были в прошлом импорте, но отсутствуют в текущем
$deletedCount = Product::where('source_id', $sourceId)
->whereNotIn('sku', $currentSkus)
->whereNull('deleted_at')
->update(['deleted_at' => now()]);
return $deletedCount;
}
Для 100 000 SKU запрос WHERE sku NOT IN (...) неэффективен. Лучше временная таблица:
CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- загружаем все SKU текущего импорта
UPDATE products
SET deleted_at = now()
WHERE source_id = $1
AND deleted_at IS NULL
AND sku NOT IN (SELECT sku FROM current_import_skus);
DROP TABLE current_import_skus;
Конкурентность: защита от двойного запуска
public function handle(): void
{
$lock = Cache::lock("import_sync_{$this->sourceId}", 3600);
if (!$lock->get()) {
Log::info("Import for source {$this->sourceId} already running, skipping");
return;
}
try {
$this->runSync();
} finally {
$lock->release();
}
}
Сроки реализации
- Timestamp-based инкремент, state manager, hash-сравнение — 2 дня
- Обнаружение удалённых позиций (temp table), lock против двойного запуска — +1 день
- Cursor-based синхронизация + diff-файлы + отчётность по delta — +1–2 дня







