Реализация очереди импорта товаров (фоновая обработка)

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.

Разработка и обслуживание любых видов сайтов:

Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация очереди импорта товаров (фоновая обработка)
Средняя
~3-5 рабочих дней
Часто задаваемые вопросы

Наши компетенции:

Этапы разработки

Последние работы

  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    874
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    851

Реализация очереди импорта товаров (фоновая обработка)

Импорт тысяч товаров в синхронном режиме — это таймаут, зависший процесс и сломанный пользовательский опыт. Правильное решение: принять файл, поставить задачу в очередь, вернуть ответ немедленно. Обработка идёт в фоне, прогресс отображается в интерфейсе.

Архитектура

HTTP Upload (файл/URL)
    ↓
Import Job (запись в очередь)
    ↓
Queue (Redis / SQS / RabbitMQ)
    ↓
Worker Process (отдельный процесс/контейнер)
    ↓
Chunk Processing (батчи по 500 товаров)
    ↓
Database (upsert)
    ↓
Progress Event (WebSocket / SSE → UI)

Реализация на Laravel

// Контроллер — принимает файл и ставит задачу
class ProductImportController extends Controller
{
    public function upload(Request $request): JsonResponse
    {
        $path = $request->file('file')->store('imports');

        $import = ImportJob::create([
            'file_path' => $path,
            'status'    => 'pending',
            'total'     => 0,
            'processed' => 0,
            'errors'    => 0,
        ]);

        ProcessProductImport::dispatch($import->id);

        return response()->json(['import_id' => $import->id]);
    }
}

// Job — обработка в фоне
class ProcessProductImport implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue;

    public int $timeout = 3600;   // 1 час
    public int $tries   = 3;

    public function handle(): void
    {
        $import = ImportJob::findOrFail($this->importId);
        $import->update(['status' => 'processing', 'started_at' => now()]);

        $reader = new CsvReader(storage_path('app/' . $import->file_path));
        $total  = $reader->count();
        $import->update(['total' => $total]);

        foreach ($reader->chunk(500) as $chunkIndex => $rows) {
            try {
                DB::transaction(function () use ($rows) {
                    foreach ($rows as $row) {
                        Product::updateOrCreate(
                            ['sku' => $row['sku']],
                            $this->mapRow($row)
                        );
                    }
                });

                $processed = ($chunkIndex + 1) * 500;
                $import->update(['processed' => min($processed, $total)]);

                // Событие прогресса
                event(new ImportProgressUpdated($import->id, min($processed, $total), $total));

            } catch (\Exception $e) {
                $import->increment('errors');
                Log::error("Import chunk failed", ['chunk' => $chunkIndex, 'error' => $e->getMessage()]);
            }
        }

        $import->update(['status' => 'completed', 'finished_at' => now()]);
    }
}

WebSocket / SSE для прогресса

// Laravel Broadcasting: событие прогресса
class ImportProgressUpdated implements ShouldBroadcast
{
    public function broadcastOn(): Channel
    {
        return new PrivateChannel("import.{$this->importId}");
    }

    public function broadcastWith(): array
    {
        return [
            'processed' => $this->processed,
            'total'     => $this->total,
            'percent'   => round($this->processed / $this->total * 100),
        ];
    }
}

На фронтенде — подписка через Laravel Echo или нативный EventSource (SSE).

Параллельная обработка

Для очень больших файлов (100 000+ товаров) — разбивка на независимые части:

// Dispatch Fan-out: один координатор → N воркеров
class DispatchImportChunks implements ShouldQueue
{
    public function handle(): void
    {
        $chunks = $this->splitFile($this->filePath, chunkSize: 1000);

        Bus::batch(
            array_map(fn($chunk) => new ProcessImportChunk($chunk), $chunks)
        )
        ->then(fn(Batch $batch) => $this->onComplete($batch))
        ->catch(fn(Batch $batch, Throwable $e) => $this->onError($batch, $e))
        ->dispatch();
    }
}

Bus::batch() в Laravel обрабатывает чанки параллельно через несколько воркеров и отслеживает общий прогресс.

Обработка ошибок

Строки с ошибками не прерывают импорт, а сохраняются в таблицу import_errors:

CREATE TABLE import_errors (
    id          BIGSERIAL PRIMARY KEY,
    import_id   BIGINT,
    row_number  INT,
    row_data    JSONB,
    error_msg   TEXT,
    created_at  TIMESTAMPTZ DEFAULT NOW()
);

После завершения пользователь скачивает CSV с ошибками для исправления.

Сроки

Система очереди импорта с прогрессом в реальном времени и логированием ошибок: 4–6 рабочих дней.