Message Queue: RabbitMQ, Bull

Message Queue (очередь сообщений) — это асинхронный механизм передачи задач между сервисами или процессами через посредника (брокер), позволяющий развязать Producer (отправитель) и Consumer (обработчик) во времени и пространстве.

Зачем нужно

Тяжёлые операции (отправка email, генерация PDF, обработка изображений) не нужно выполнять в HTTP-ответе — они замедляют API. Message Queue позволяет принять задачу в очередь (быстро ответить клиенту) и обработать её асинхронно в воркере. Также даёт retry при сбоях, приоритизацию задач и горизонтальное масштабирование воркеров.

Где используется

  • Отправка email и SMS после регистрации/заказа
  • Генерация отчётов, PDF, экспорт данных
  • Обработка загруженных файлов (ресайз изображений, транскодинг)
  • Фоновые задачи по расписанию (cron jobs)
  • Межсервисная коммуникация в микросервисах

Основной контент

Bull — очереди на Redis

Bull — популярная библиотека для Node.js очередей, использует Redis как хранилище.

npm install bull
# Redis должен быть запущен
// queues/emailQueue.js
const Bull = require('bull');

const emailQueue = new Bull('email', {
  redis: { host: 'localhost', port: 6379 }
});

// Producer — добавляем задачу
async function queueWelcomeEmail(userId) {
  await emailQueue.add(
    { userId, type: 'welcome' },
    {
      attempts: 3,        // повтор при ошибке
      backoff: 5000,      // пауза между попытками (мс)
      removeOnComplete: true
    }
  );
}

// Consumer — обрабатываем задачу
emailQueue.process(async (job) => {
  const { userId, type } = job.data;
  const user = await UserService.getById(userId);
  await EmailService.send({ to: user.email, template: type });
  console.log(`Email sent to ${user.email}`);
});

// События
emailQueue.on('completed', (job) => console.log(`Job ${job.id} done`));
emailQueue.on('failed', (job, err) => console.error(`Job ${job.id} failed: ${err.message}`));

module.exports = { emailQueue, queueWelcomeEmail };

Bull — приоритеты и задержки

// Высокий приоритет (меньше число = выше приоритет)
await emailQueue.add({ userId: 1 }, { priority: 1 });

// Отложенное выполнение — через 10 минут
await emailQueue.add({ report: 'monthly' }, { delay: 10 * 60 * 1000 });

// Повторяющаяся задача (cron)
await emailQueue.add(
  { type: 'digest' },
  { repeat: { cron: '0 9 * * *' } } // каждый день в 9:00
);

BullMQ — современная версия

npm install bullmq
const { Queue, Worker } = require('bullmq');
const connection = { host: 'localhost', port: 6379 };

// Producer
const queue = new Queue('images', { connection });
await queue.add('resize', { filename: 'photo.jpg', width: 800 });

// Consumer
const worker = new Worker('images', async (job) => {
  await ImageService.resize(job.data.filename, job.data.width);
}, { connection, concurrency: 5 }); // 5 параллельных воркеров

worker.on('failed', (job, err) => console.error(err));

RabbitMQ через amqplib

npm install amqplib
const amqp = require('amqplib');

// Producer
async function publishTask(data) {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel;
  await channel.assertQueue('tasks', { durable: true });
  channel.sendToQueue('tasks', Buffer.from(JSON.stringify(data)), { persistent: true });
  await channel.close();
  await conn.close();
}

// Consumer
async function startWorker() {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel;
  await channel.assertQueue('tasks', { durable: true });
  channel.prefetch(1); // один за раз

  channel.consume('tasks', async (msg) => {
    const data = JSON.parse(msg.content.toString());
    await processTask(data);
    channel.ack(msg); // подтвердить обработку
  });
}

Частые ошибки

  • Не подтверждать задачи (ack) — в RabbitMQ: задача вернётся в очередь после разрыва соединения; всегда вызывать channel.ack(msg)
  • Не настроить retry — без повторных попыток сбой воркера теряет задачу
  • Блокирующий код в обработчике — CPU-интенсивные операции блокируют Event Loop воркера; выносить в Worker Threads или child_process
  • Не мониторить очередь — Bull UI (bull-board) или RabbitMQ Management Plugin позволяют видеть накопление задач

Связанные темы

Ресурсы