Реализация пакетной обработки файлов (Batch Processing) на сервере

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация пакетной обработки файлов (Batch Processing) на сервере
Сложная
~3-5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Реализация пакетной обработки файлов (Batch Processing) на сервере

Пакетная обработка — это когда задач много, они однотипны, и их нужно выполнить эффективно, не падая под нагрузкой. Импорт 50 000 строк из CSV, конвертация архива из 3 000 изображений, ночная перегенерация sitemap — всё это batch-задачи с разными требованиями к времени выполнения, памяти и обработке ошибок.

Ключевые проблемы batch-обработки

Память. Загружать весь CSV в массив — путь к OOM. Правильный паттерн — потоковое чтение чанками.

Частичные ошибки. Если из 10 000 строк 50 невалидны — останавливать весь процесс неправильно. Нужна логика: пропускаем плохие строки, пишем в лог, продолжаем.

Воспроизводимость. Если процесс упал на 7 000-й строке — нужна возможность продолжить с места остановки, а не начинать заново.

Параллелизм. Последовательная обработка 50 000 записей по 100 мс каждая = почти 1,5 часа. Разбивка на параллельные Job'ы сокращает это кратно.

Архитектура: chunked + parallel jobs

Паттерн «Batch → Chunks → Jobs»:

Загрузка файла
     ↓
BatchImportJob (мастер-задача)
     ↓ разбивает на чанки
[ChunkJob 1] [ChunkJob 2] ... [ChunkJob N]  ← параллельно
     ↓
BatchCompletedJob (агрегация результатов)

Laravel предоставляет Bus::batch() для этого паттерна.

Реализация на примере импорта CSV

namespace App\Services;

use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\LazyCollection;

class CsvImportService
{
    private const CHUNK_SIZE = 500;

    public function startImport(string $filePath, int $importId): string
    {
        $jobs = [];

        // LazyCollection — потоковое чтение без загрузки в память
        LazyCollection::make(function () use ($filePath) {
            $handle = fopen($filePath, 'r');
            $header = fgetcsv($handle); // первая строка — заголовки

            while (($row = fgetcsv($handle)) !== false) {
                yield array_combine($header, $row);
            }

            fclose($handle);
        })
        ->chunk(self::CHUNK_SIZE)
        ->each(function ($chunk, $index) use (&$jobs, $importId) {
            $jobs[] = new ProcessCsvChunkJob(
                importId: $importId,
                chunkIndex: $index,
                rows: $chunk->values()->toArray()
            );
        });

        $batch = Bus::batch($jobs)
            ->name("csv-import-{$importId}")
            ->allowFailures() // продолжаем при ошибке отдельных Job'ов
            ->then(function (Batch $batch) use ($importId) {
                Import::find($importId)?->update(['status' => 'completed']);
                ImportCompletedEvent::dispatch($importId);
            })
            ->catch(function (Batch $batch, \Throwable $e) use ($importId) {
                Import::find($importId)?->update([
                    'status'        => 'partially_failed',
                    'error_message' => $e->getMessage(),
                ]);
            })
            ->finally(function (Batch $batch) use ($importId) {
                $import = Import::find($importId);
                $import?->update([
                    'total_jobs'    => $batch->totalJobs,
                    'failed_jobs'   => $batch->failedJobs,
                    'finished_at'   => now(),
                ]);
            })
            ->onQueue('batch-processing')
            ->dispatch();

        Import::find($importId)?->update(['batch_id' => $batch->id]);

        return $batch->id;
    }
}

Job обработки чанка

// app/Jobs/ProcessCsvChunkJob.php
class ProcessCsvChunkJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries   = 3;
    public int $timeout = 120;
    public int $backoff = 10;

    public function __construct(
        private int   $importId,
        private int   $chunkIndex,
        private array $rows
    ) {}

    public function handle(): void
    {
        // Если весь batch был отменён — прекращаем
        if ($this->batch()?->cancelled()) {
            return;
        }

        $successCount = 0;
        $errors       = [];

        foreach ($this->rows as $lineNum => $row) {
            try {
                $this->processRow($row);
                $successCount++;
            } catch (\Throwable $e) {
                $errors[] = [
                    'chunk'  => $this->chunkIndex,
                    'line'   => $lineNum,
                    'data'   => array_slice($row, 0, 3), // первые 3 поля для диагностики
                    'error'  => $e->getMessage(),
                ];
            }
        }

        // Сохраняем статистику чанка
        ImportChunkResult::create([
            'import_id'     => $this->importId,
            'chunk_index'   => $this->chunkIndex,
            'processed'     => count($this->rows),
            'succeeded'     => $successCount,
            'failed'        => count($errors),
            'errors'        => $errors,
        ]);

        // Атомарно обновляем счётчики импорта
        Import::where('id', $this->importId)->increment('processed_rows', count($this->rows));
        Import::where('id', $this->importId)->increment('success_rows', $successCount);
    }

    private function processRow(array $row): void
    {
        // Здесь валидация и сохранение строки
        // Пример:
        $validated = validator($row, [
            'email' => 'required|email',
            'name'  => 'required|string|max:255',
        ])->validate();

        User::updateOrCreate(
            ['email' => $validated['email']],
            ['name'  => $validated['name']]
        );
    }
}

Возобновление прерванного batch

Если сервер упал в середине обработки — Laravel Batch хранит состояние в таблице job_batches. Завершённые чанки повторно не запускаются. Незавершённые — возобновляются автоматически при рестарте воркера.

Принудительный рестарт незавершённого batch:

$batch = Bus::findBatch($batchId);
if ($batch && !$batch->finished()) {
    // Пересоздаём незавершённые jobs
    $pendingChunks = ImportChunkResult::where('import_id', $importId)
        ->pluck('chunk_index');

    // Логика определения не обработанных чанков и их повторного диспатча
}

Прогресс в реальном времени

// app/Http/Controllers/ImportController.php
public function progress(int $importId): JsonResponse
{
    $import = Import::findOrFail($importId);
    $batch  = $import->batch_id ? Bus::findBatch($import->batch_id) : null;

    return response()->json([
        'status'          => $import->status,
        'processed_rows'  => $import->processed_rows,
        'success_rows'    => $import->success_rows,
        'total_rows'      => $import->total_rows,
        'percentage'      => $import->total_rows > 0
            ? round($import->processed_rows / $import->total_rows * 100, 1)
            : 0,
        'batch' => $batch ? [
            'total_jobs'      => $batch->totalJobs,
            'pending_jobs'    => $batch->pendingJobs,
            'failed_jobs'     => $batch->failedJobs,
            'progress'        => $batch->progress(),
        ] : null,
    ]);
}

Ограничение нагрузки

Для batch-очереди нужен отдельный пул воркеров с ограниченным параллелизмом, чтобы не забить всю БД или CPU:

[program:batch-worker]
command=php artisan queue:work --queue=batch-processing --max-jobs=50 --sleep=3 --timeout=120
numprocs=4
autostart=true
autorestart=true

numprocs=4 — четыре воркера, каждый обрабатывает чанки последовательно. --max-jobs=50 — после 50 задач воркер перезапускается, освобождая память.

Обработка файлов в нескольких форматах

Тот же паттерн работает для изображений, JSON, XLSX. Для XLSX используем PhpSpreadsheet в потоковом режиме:

use PhpOffice\PhpSpreadsheet\Reader\Xlsx;

$reader = new Xlsx();
$reader->setReadDataOnly(true);
$spreadsheet = $reader->load($filePath);

$worksheet = $spreadsheet->getActiveSheet();
$highestRow = $worksheet->getHighestDataRow();

// Читаем чанками по 500 строк
for ($startRow = 2; $startRow <= $highestRow; $startRow += 500) {
    $endRow = min($startRow + 499, $highestRow);
    $rows   = $worksheet->rangeToArray("A{$startRow}:Z{$endRow}");
    // диспатч Job для чанка
}

Сроки

Базовый импорт CSV с чанками и прогрессом — 1 рабочий день. Добавление возобновления, детального лога ошибок, endpoint прогресса — ещё 6–8 часов. Поддержка XLSX и JSON форматов — плюс 4–6 часов.