Message Queue: RabbitMQ, Bull
Message Queue (очередь сообщений) — это асинхронный механизм передачи задач между сервисами или процессами через посредника (брокер), позволяющий развязать Producer (отправитель) и Consumer (обработчик) во времени и пространстве.
Зачем нужно
Тяжёлые операции (отправка email, генерация PDF, обработка изображений) не нужно выполнять в HTTP-ответе — они замедляют API. Message Queue позволяет принять задачу в очередь (быстро ответить клиенту) и обработать её асинхронно в воркере. Также даёт retry при сбоях, приоритизацию задач и горизонтальное масштабирование воркеров.
Где используется
- Отправка email и SMS после регистрации/заказа
- Генерация отчётов, PDF, экспорт данных
- Обработка загруженных файлов (ресайз изображений, транскодинг)
- Фоновые задачи по расписанию (cron jobs)
- Межсервисная коммуникация в микросервисах
Основной контент
Bull — очереди на Redis
Bull — популярная библиотека для Node.js очередей, использует Redis как хранилище.
npm install bull
# Redis должен быть запущен
// queues/emailQueue.js
const Bull = require('bull');
const emailQueue = new Bull('email', {
redis: { host: 'localhost', port: 6379 }
});
// Producer — добавляем задачу
async function queueWelcomeEmail(userId) {
await emailQueue.add(
{ userId, type: 'welcome' },
{
attempts: 3, // повтор при ошибке
backoff: 5000, // пауза между попытками (мс)
removeOnComplete: true
}
);
}
// Consumer — обрабатываем задачу
emailQueue.process(async (job) => {
const { userId, type } = job.data;
const user = await UserService.getById(userId);
await EmailService.send({ to: user.email, template: type });
console.log(`Email sent to ${user.email}`);
});
// События
emailQueue.on('completed', (job) => console.log(`Job ${job.id} done`));
emailQueue.on('failed', (job, err) => console.error(`Job ${job.id} failed: ${err.message}`));
module.exports = { emailQueue, queueWelcomeEmail };
Bull — приоритеты и задержки
// Высокий приоритет (меньше число = выше приоритет)
await emailQueue.add({ userId: 1 }, { priority: 1 });
// Отложенное выполнение — через 10 минут
await emailQueue.add({ report: 'monthly' }, { delay: 10 * 60 * 1000 });
// Повторяющаяся задача (cron)
await emailQueue.add(
{ type: 'digest' },
{ repeat: { cron: '0 9 * * *' } } // каждый день в 9:00
);
BullMQ — современная версия
npm install bullmq
const { Queue, Worker } = require('bullmq');
const connection = { host: 'localhost', port: 6379 };
// Producer
const queue = new Queue('images', { connection });
await queue.add('resize', { filename: 'photo.jpg', width: 800 });
// Consumer
const worker = new Worker('images', async (job) => {
await ImageService.resize(job.data.filename, job.data.width);
}, { connection, concurrency: 5 }); // 5 параллельных воркеров
worker.on('failed', (job, err) => console.error(err));
RabbitMQ через amqplib
npm install amqplib
const amqp = require('amqplib');
// Producer
async function publishTask(data) {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel;
await channel.assertQueue('tasks', { durable: true });
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), { persistent: true });
await channel.close();
await conn.close();
}
// Consumer
async function startWorker() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel;
await channel.assertQueue('tasks', { durable: true });
channel.prefetch(1); // один за раз
channel.consume('tasks', async (msg) => {
const data = JSON.parse(msg.content.toString());
await processTask(data);
channel.ack(msg); // подтвердить обработку
});
}
Частые ошибки
- Не подтверждать задачи (ack) — в RabbitMQ: задача вернётся в очередь после разрыва соединения; всегда вызывать
channel.ack(msg) - Не настроить retry — без повторных попыток сбой воркера теряет задачу
- Блокирующий код в обработчике — CPU-интенсивные операции блокируют Event Loop воркера; выносить в Worker Threads или child_process
- Не мониторить очередь — Bull UI (bull-board) или RabbitMQ Management Plugin позволяют видеть накопление задач