Реализация пакетной обработки файлов (Batch Processing) на сервере
Пакетная обработка — это когда задач много, они однотипны, и их нужно выполнить эффективно, не падая под нагрузкой. Импорт 50 000 строк из CSV, конвертация архива из 3 000 изображений, ночная перегенерация sitemap — всё это batch-задачи с разными требованиями к времени выполнения, памяти и обработке ошибок.
Ключевые проблемы batch-обработки
Память. Загружать весь CSV в массив — путь к OOM. Правильный паттерн — потоковое чтение чанками.
Частичные ошибки. Если из 10 000 строк 50 невалидны — останавливать весь процесс неправильно. Нужна логика: пропускаем плохие строки, пишем в лог, продолжаем.
Воспроизводимость. Если процесс упал на 7 000-й строке — нужна возможность продолжить с места остановки, а не начинать заново.
Параллелизм. Последовательная обработка 50 000 записей по 100 мс каждая = почти 1,5 часа. Разбивка на параллельные Job'ы сокращает это кратно.
Архитектура: chunked + parallel jobs
Паттерн «Batch → Chunks → Jobs»:
Загрузка файла
↓
BatchImportJob (мастер-задача)
↓ разбивает на чанки
[ChunkJob 1] [ChunkJob 2] ... [ChunkJob N] ← параллельно
↓
BatchCompletedJob (агрегация результатов)
Laravel предоставляет Bus::batch() для этого паттерна.
Реализация на примере импорта CSV
namespace App\Services;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\LazyCollection;
class CsvImportService
{
private const CHUNK_SIZE = 500;
public function startImport(string $filePath, int $importId): string
{
$jobs = [];
// LazyCollection — потоковое чтение без загрузки в память
LazyCollection::make(function () use ($filePath) {
$handle = fopen($filePath, 'r');
$header = fgetcsv($handle); // первая строка — заголовки
while (($row = fgetcsv($handle)) !== false) {
yield array_combine($header, $row);
}
fclose($handle);
})
->chunk(self::CHUNK_SIZE)
->each(function ($chunk, $index) use (&$jobs, $importId) {
$jobs[] = new ProcessCsvChunkJob(
importId: $importId,
chunkIndex: $index,
rows: $chunk->values()->toArray()
);
});
$batch = Bus::batch($jobs)
->name("csv-import-{$importId}")
->allowFailures() // продолжаем при ошибке отдельных Job'ов
->then(function (Batch $batch) use ($importId) {
Import::find($importId)?->update(['status' => 'completed']);
ImportCompletedEvent::dispatch($importId);
})
->catch(function (Batch $batch, \Throwable $e) use ($importId) {
Import::find($importId)?->update([
'status' => 'partially_failed',
'error_message' => $e->getMessage(),
]);
})
->finally(function (Batch $batch) use ($importId) {
$import = Import::find($importId);
$import?->update([
'total_jobs' => $batch->totalJobs,
'failed_jobs' => $batch->failedJobs,
'finished_at' => now(),
]);
})
->onQueue('batch-processing')
->dispatch();
Import::find($importId)?->update(['batch_id' => $batch->id]);
return $batch->id;
}
}
Job обработки чанка
// app/Jobs/ProcessCsvChunkJob.php
class ProcessCsvChunkJob implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $timeout = 120;
public int $backoff = 10;
public function __construct(
private int $importId,
private int $chunkIndex,
private array $rows
) {}
public function handle(): void
{
// Если весь batch был отменён — прекращаем
if ($this->batch()?->cancelled()) {
return;
}
$successCount = 0;
$errors = [];
foreach ($this->rows as $lineNum => $row) {
try {
$this->processRow($row);
$successCount++;
} catch (\Throwable $e) {
$errors[] = [
'chunk' => $this->chunkIndex,
'line' => $lineNum,
'data' => array_slice($row, 0, 3), // первые 3 поля для диагностики
'error' => $e->getMessage(),
];
}
}
// Сохраняем статистику чанка
ImportChunkResult::create([
'import_id' => $this->importId,
'chunk_index' => $this->chunkIndex,
'processed' => count($this->rows),
'succeeded' => $successCount,
'failed' => count($errors),
'errors' => $errors,
]);
// Атомарно обновляем счётчики импорта
Import::where('id', $this->importId)->increment('processed_rows', count($this->rows));
Import::where('id', $this->importId)->increment('success_rows', $successCount);
}
private function processRow(array $row): void
{
// Здесь валидация и сохранение строки
// Пример:
$validated = validator($row, [
'email' => 'required|email',
'name' => 'required|string|max:255',
])->validate();
User::updateOrCreate(
['email' => $validated['email']],
['name' => $validated['name']]
);
}
}
Возобновление прерванного batch
Если сервер упал в середине обработки — Laravel Batch хранит состояние в таблице job_batches. Завершённые чанки повторно не запускаются. Незавершённые — возобновляются автоматически при рестарте воркера.
Принудительный рестарт незавершённого batch:
$batch = Bus::findBatch($batchId);
if ($batch && !$batch->finished()) {
// Пересоздаём незавершённые jobs
$pendingChunks = ImportChunkResult::where('import_id', $importId)
->pluck('chunk_index');
// Логика определения не обработанных чанков и их повторного диспатча
}
Прогресс в реальном времени
// app/Http/Controllers/ImportController.php
public function progress(int $importId): JsonResponse
{
$import = Import::findOrFail($importId);
$batch = $import->batch_id ? Bus::findBatch($import->batch_id) : null;
return response()->json([
'status' => $import->status,
'processed_rows' => $import->processed_rows,
'success_rows' => $import->success_rows,
'total_rows' => $import->total_rows,
'percentage' => $import->total_rows > 0
? round($import->processed_rows / $import->total_rows * 100, 1)
: 0,
'batch' => $batch ? [
'total_jobs' => $batch->totalJobs,
'pending_jobs' => $batch->pendingJobs,
'failed_jobs' => $batch->failedJobs,
'progress' => $batch->progress(),
] : null,
]);
}
Ограничение нагрузки
Для batch-очереди нужен отдельный пул воркеров с ограниченным параллелизмом, чтобы не забить всю БД или CPU:
[program:batch-worker]
command=php artisan queue:work --queue=batch-processing --max-jobs=50 --sleep=3 --timeout=120
numprocs=4
autostart=true
autorestart=true
numprocs=4 — четыре воркера, каждый обрабатывает чанки последовательно. --max-jobs=50 — после 50 задач воркер перезапускается, освобождая память.
Обработка файлов в нескольких форматах
Тот же паттерн работает для изображений, JSON, XLSX. Для XLSX используем PhpSpreadsheet в потоковом режиме:
use PhpOffice\PhpSpreadsheet\Reader\Xlsx;
$reader = new Xlsx();
$reader->setReadDataOnly(true);
$spreadsheet = $reader->load($filePath);
$worksheet = $spreadsheet->getActiveSheet();
$highestRow = $worksheet->getHighestDataRow();
// Читаем чанками по 500 строк
for ($startRow = 2; $startRow <= $highestRow; $startRow += 500) {
$endRow = min($startRow + 499, $highestRow);
$rows = $worksheet->rangeToArray("A{$startRow}:Z{$endRow}");
// диспатч Job для чанка
}
Сроки
Базовый импорт CSV с чанками и прогрессом — 1 рабочий день. Добавление возобновления, детального лога ошибок, endpoint прогресса — ещё 6–8 часов. Поддержка XLSX и JSON форматов — плюс 4–6 часов.







