Очереди сообщений в Node.js, часто реализуемые с использованием таких библиотек, как RabbitMQ, Kafka или AWS SQS (Simple Queue Service), представляют собой тип механизма асинхронной связи, который позволяет различным частям приложения или нескольким приложениям взаимодействовать друг с другом путем передачи сообщений. между ними. Очереди сообщений позволяют отделить потоки данных и задач и управлять ими, что позволяет создавать более масштабируемые и отказоустойчивые системы. Они обычно используются в распределенных и микросервисных архитектурах.

Вот объяснение очередей сообщений с убедительными примерами и вариантами их использования:

1. **Базовый пример**
Представьте, что у вас есть приложение Node.js, которое обрабатывает заказы пользователей для веб-сайта электронной коммерции. Вместо синхронной обработки каждого заказа, что может привести к задержкам при большом трафике, вы можете использовать очередь сообщений для асинхронной обработки заказов.

Пример использования:
— когда пользователь размещает заказ, ваше приложение Node.js добавляет сообщение заказа в очередь сообщений.
— отдельный рабочий процесс или микросервис прослушивает очередь, извлекает сообщения заказа, и обрабатывает каждый заказ самостоятельно. Это гарантирует, что обработка заказа не блокирует основной поток приложения.

2. **Фоновые задания**.
Очереди сообщений обычно используются для обработки фоновых заданий, таких как отправка электронных писем, создание отчетов или выполнение периодических задач.

Пример использования:
– ваше приложение Node.js позволяет пользователям запрашивать уведомления по электронной почте.
– вместо синхронной отправки электронных писем приложение добавляет запросы на уведомления по электронной почте в очередь сообщений.
– отдельный рабочий процесс читает из очереди, обрабатывает уведомления по электронной почте и отправляет их, не влияя на скорость отклика основного приложения.

3. **Балансировка нагрузки.**
Очереди сообщений могут распределять рабочие нагрузки между несколькими экземплярами службы, помогая добиться балансировки нагрузки и эффективного использования ресурсов.

Пример использования:
 – ваше приложение Node.js получает входящие запросы от клиентов.
 – вместо обработки каждого запроса на одном сервере вы используете очередь сообщений для распределения входящих запросов по разным экземплярам вашего приложения. .
— каждый экземпляр приложения обрабатывает запросы независимо, что повышает производительность и масштабируемость.

4. **Связь микрослужб**.
В архитектуре микрослужб различные службы должны взаимодействовать друг с другом. Очереди сообщений позволяют упростить взаимодействие и координацию между микрослужбами.

Пример использования:
 – Ваше приложение электронной коммерции состоит из отдельных микросервисов для аутентификации пользователей, каталога продуктов и обработки заказов.
 – Когда пользователь размещает заказ, микросервис обработки заказов добавляет сообщение с запросом заказа. в очередь.
— микрослужба управления запасами прослушивает очередь, обновляет доступность продуктов и уведомляет другие микрослужбы о заказе.

5. **Источники событий**.
Очереди сообщений используются в шаблонах источников событий для захвата и записи изменений в состоянии приложения.

Пример использования:
 – Вашему приложению Node.js необходимо отслеживать изменения в профилях пользователей, заказах и запасах.
 – Каждое изменение состояния записывается как сообщение о событии в очереди.
 – Другие службы или компоненты могут подписываться на эти события, чтобы поддерживать свои собственные данные или реагировать на изменения.

В целом очереди сообщений в Node.js обеспечивают мощный способ обработки асинхронной связи, разделения компонентов и создания масштабируемых и быстро реагирующих приложений. Они особенно ценны в распределенных системах, архитектурах микросервисов и сценариях, где задачи должны обрабатываться асинхронно и независимо.

Очереди недоставленных писем:

Очередь недоставленных сообщений (DLQ) функционирует как отдельное хранилище сообщений в системе обмена сообщениями, предназначенное для временного хранения сообщений, когда программное обеспечение обнаруживает ошибки, препятствующие их успешной обработке. Очереди сообщений служат каналами связи для обмена сообщениями между различными программными службами в распределенной системе, обеспечивая асинхронную связь без необходимости постоянной доступности получателя сообщения. Основная роль очереди недоставленных сообщений заключается в удержании проблемных сообщений, таких как сообщения, не имеющие действительного адресата, или сообщения, которые предполагаемый получатель не может обработать, предоставляя выделенную область для их хранения и анализа.

Каковы преимущества использования очереди недоставленных сообщений?

Далее давайте обсудим преимущества использования очереди недоставленных сообщений (DLQ).

Экономия затрат на связь. В обычных очередях сообщений сообщения обрабатываются до окончания срока их хранения. Эта настройка обеспечивает непрерывную обработку сообщений и предотвращает блокировку вашей очереди.

Однако при работе с большим объемом сообщений значительное количество сообщений об ошибках может привести к увеличению затрат на связь и нагрузке на систему связи. Вместо того, чтобы настойчиво пытаться обработать неудачные сообщения, более разумным подходом является перемещение их в очередь недоставленных сообщений после нескольких попыток.

Расширенное решение проблем. Передавая ошибочные сообщения в DLQ, разработчики могут сосредоточиться на выявлении основных причин ошибок. Они могут выяснить, почему предполагаемый получатель не смог обработать сообщения, внести необходимые исправления и повторить попытку отправки сообщений.

Например, представьте себе банковское программное обеспечение, которое ежедневно отправляет многочисленные заявки на кредитные карты в свою серверную систему для утверждения. Хоть серверная система и получает заявки, она не может обработать их все из-за неполной информации. Вместо того, чтобы бесконечно пытаться их обработать, программное обеспечение перенаправляет сообщения в DLQ до тех пор, пока ИТ-команда не решит проблему. Это позволяет системе эффективно обрабатывать и доставлять оставшиеся сообщения без каких-либо сбоев в производительности.

БЫК MQ:

BullMQ — это популярная библиотека Node.js, которая реализует надежную, постоянную и многофункциональную очередь заданий и сообщений на основе Redis. Он основан на основной библиотеке Bull и предоставляет дополнительные функции и улучшения.

BullMQ основан на 4 классах, которые вместе можно использовать для решения множества различных проблем. Это классы Queue, Worker, QueueEvents. >» и FlowProducer.

Первый класс, о котором вы должны знать, это класс Queue. Этот класс представляет очередь и может использоваться для добавления заданий в очередь, а также для некоторых других основных операций, таких как приостановка, очистка или получение данных из очереди.

Задания в BullMQ — это созданная пользователем структура данных, которую можно хранить в очереди. Задания обрабатываются работниками. Worker — это второй класс, о котором вам следует знать. Рабочие — это экземпляры, способные обрабатывать задания. У вас может быть много рабочих процессов, работающих либо в одном процессе Node.js, либо в отдельных процессах, а также на разных машинах. Все они будут потреблять задания из очереди и помечать задания как выполненные или не выполненные.

В этом примере мы создадим упрощенную версию системы доставки еды с использованием BullMQ. Система будет управлять входящими заказами на доставку еды и отправлять их доступным водителям для эффективной и своевременной доставки.

1. Установка и настройка:

Установите необходимые пакеты:

npm install bullmq ioredis

2. Создайте приложение Node.js со следующим кодом:

const { Queue, Worker, QueueScheduler } = require('bullmq');
const { RedisClient } = require('ioredis');

// Create a Redis connection
const redisClient = new RedisClient();

// Create a queue for incoming delivery orders
const orderQueue = new Queue('deliveryOrders', { connection: redisClient });

// Create a queue for dispatching orders to drivers
const dispatchQueue = new Queue('dispatchQueue', { connection: redisClient });

// Create a queue for dead-letter dispatches
const deadLetterQueue = new Queue('deadLetterQueue', { connection: redisClient });

// Create a worker to process incoming orders
const orderWorker = new Worker('deliveryOrders', async job => {
  console.log(`Received order: ${job.data.orderId}`);
  // Simulate order processing and dispatching
  const success = await dispatchOrder(job.data.orderId);
  if (!success) {
    console.log(`Failed to dispatch order: ${job.data.orderId}`);
    await deadLetterQueue.add('failedDispatch', job.data);
  }
}, { connection: redisClient });

// Create a worker to dispatch orders to drivers
const dispatchWorker = new Worker('dispatchQueue', async job => {
  console.log(`Dispatching order to driver: ${job.data.orderId}`);
  // Simulate driver assignment and delivery
  const success = await simulateDriverAssignment(job.data.orderId);
  if (!success) {
    console.log(`Failed to deliver order: ${job.data.orderId}`);
    await deadLetterQueue.add('failedDelivery', job.data);
  }
}, { connection: redisClient });

// Create a queue scheduler for the dispatch queue
const scheduler = new QueueScheduler('dispatchQueue', { connection: redisClient });

// Simulate incoming delivery orders
function placeOrder(orderId) {
  console.log(`New order received: ${orderId}`);
  orderQueue.add('processOrder', { orderId });
}

// Simulate driver assignment and delivery
async function simulateDriverAssignment(orderId) {
  // Simulate driver assignment and delivery time
  await new Promise(resolve => setTimeout(resolve, 3000));
  return Math.random() < 0.8; // Simulate 80% success rate
}

// Simulate order dispatching with potential failures
async function dispatchOrder(orderId) {
  // Simulate dispatching with a chance of failure
  await new Promise(resolve => setTimeout(resolve, 1000));
  return Math.random() < 0.9; // Simulate 90% success rate
}

// Simulate placing new delivery orders
placeOrder('order-1');
placeOrder('order-2');
placeOrder('order-3');
  1. Мы создаем новую очередь с именем deadLetterQueue для хранения неудавшихся отправок заказов.
  2. orderWorker и dispatchWorker теперь обрабатывают сбои при отправке заказов, проверяя успешность смоделированного назначения и доставки водителя. В случае сбоя данные неудачного задания добавляются в deadLetterQueue для дальнейшего анализа.
  3. Мы используем простую симуляцию вероятности успеха, чтобы определить, была ли отправка или доставка заказа успешной. Вы можете настроить эти вероятности по мере необходимости.
  4. Функция dispatchOrder имитирует отправку заказа с вероятностью отказа.
  5. В случае сбоя отправки или доставки заказа соответствующие данные о задании добавляются в deadLetterQueue для возможного устранения неполадок и ручного вмешательства.

Теперь, если происходит сбой отправки или доставки заказа, данные задания перемещаются в deadLetterQueue, что позволяет разработчикам исследовать и устранять проблемы, вызывающие сбои. Эта функция очереди недоставленных сообщений повышает надежность и отказоустойчивость системы доставки еды.\