Шина событий — это механизм, который позволяет различным компонентам взаимодействовать друг с другом, не зная друг о друге. Компонент может отправить событие на шину событий, не зная, кто его подхватит или сколько других его подхватит. Компоненты также могут прослушивать события на шине событий, не зная, кто отправил события. Таким образом, компоненты могут взаимодействовать, не завися друг от друга. Кроме того, очень легко заменить компонент. Пока новый компонент понимает отправляемые и получаемые События, другие компоненты никогда об этом не узнают.

В распределенной системе в основном есть два режима связи: асинхронная и синхронная связь. При синхронном общении, когда вы отправляете запрос, вам нужно будет ждать ответа, а при асинхронном, наоборот, вы не получаете ответ мгновенно. Одним из способов создания шины событий для распределенной системы является использование брокера сообщений для асинхронной связи. Брокер сообщений — это программное обеспечение, обеспечивающее надежное средство связи между различными приложениями. Примерами популярных брокеров сообщений являются RabbitMQ, Kafka, NAT Gateway, SQS, SNS, Google PubSub и т. д. Сегодня мы создадим простую шину событий, используя RabbitMQ, чтобы разделить различные службы и продемонстрировать, как они могут отправлять и получать сообщения.

RabbitMQ легко развертывается локально и в облаке. Он поддерживает несколько протоколов обмена сообщениями. RabbitMQ можно развернуть в распределенных и федеративных конфигурациях для удовлетворения масштабных требований к высокой доступности. Основным отличием RabbitMQ от других брокеров сообщений является его открытый исходный код и легкий вес. Развернуть простой контейнер RabbitMQ очень просто, и для его запуска потребуется менее 300 МБ памяти, или вы можете использовать https://www.cloudamqp.com/, чтобы понять основы RabbitMQ.

Как работает RabbitMQ

Издатель. Это приложение, которое отправляет сообщение брокеру сообщений RabbitMQ. Приложение может быть приложением Node, PHP, Ruby on rails или Go.

Обмен: издатель отправляет сообщение на обмен, который отвечает за маршрутизацию сообщения в очередь. Маршрутизация сообщений — это то, чем отличается RabbitMQ, поскольку он предоставляет вам широкий спектр возможностей для маршрутизации сообщений в разные очереди. RabbitMQ предоставляет различные типы обмена: прямой, разветвленный, тематический, заголовочный и безымянный обмен (по умолчанию).

Очередь: Очередь – это структура данных, похожая на очередь в реальном мире. очередь – это структура данных, в которой все, что приходит первым, удаляется первым. В RabbitMQ сообщения публикуются из приложения в биржу и направляются с помощью ключа маршрутизации в очередь. В распределенной системе каждая служба может получать сообщения из одной или нескольких очередей.

Потребитель: это приложение, которое подключается к очереди и извлекает все сообщения, отправленные в очередь, в режиме реального времени.

Разработка шины событий для простого блога.

На приведенной выше диаграмме шлюз API направляет синхронный запрос пользовательской службе, которая отвечает за создание нового пользователя и публикацию события user:created, в RabbitMQ, отвечающую за маршрутизацию сообщений в соответствующие очереди. Служба Post прослушивает post_queue, служба комментариев слушает comment_queue, а служба Notification прослушивает уведомление_queue. Это достигается с помощью шаблона обмена сообщениями pub-sub, поскольку RabbitMQ не поставляется с готовым шаблоном PubSub, как SNS, Google PubSub и Kafka, мы можем добиться этого, используя тип обмена разветвлением.

Моделирование шины событий для простого блога.

В приведенной выше таблице показано, как различные службы публикуют события в брокере RabbitMQ. Используя разветвленный обмен для привязки всех очередей, каждое опубликованное событие делегируется соответствующему сервису. Мы также можем использовать обмен как событие, в зависимости от выбранного вами соглашения об именовании. Предпочтительно, чтобы события можно было отправлять через заголовок сообщения, и вы можете определять различные типы обработчиков событий на основе имени события.

В RabbitMQ, чтобы прослушивать сообщения с разных бирж, вам нужно утвердить очередь и биржу существует, затем вы привязываете очередь ко всем биржам, которые хотите прослушать. Обмен в этом контексте можно рассматривать как тему обмена сообщениями, ориентированного на Pubsub.

Обработка условий гонки в шине событий.

Обработка сообщений в правильном порядке действительно важна при обработке данных. В нашей шине событий мы будем использовать шаблон FIFO (первым пришел — первым обслужен) для обработки всех входящих сообщений. сообщение A должно быть обработано до того, как будет обработано сообщение B, чтобы обеспечить согласованность в нашей системе. Например, у нас есть горизонтально масштабируемая бэкэнд-система, использующая Kubernetes, каждая служба имеет несколько запущенных модулей для достижения высокой доступности, и все модули, работающие для конкретной службы, подключены к брокеру сообщений RabbitMQ. По умолчанию RabbitMQ не будет отправлять сообщение всем запущенным модулям, он будет отправлять сообщение только одному модулю, не предусмотрев возможность обработки условий гонки. Существует 1% вероятность того, что событие post:updated может быть обработано до события post:created или post:deleted может быть обработано до post:updated. В блоге, где только один пользователь может выполнить обновить до той же записи, вероятность того, что это произойдет, очень мала, но когда вы создаете финтех-продукт, который несколько пользователей могут вносить или снимать с кошелька, вероятность того, что вы столкнетесь с этой проблемой, очень высока. В монолитной архитектуре лучшим подходом было бы реализовать блокировку строк, но поскольку база данных распределена, проблема немного сложнее. Управление версиями записей — хороший подход, решающий эту проблему. Каждый раз, когда запись создается и обновляется, столбец версии увеличивается на единицу.

Когда мы публикуем наши сообщения post:created и post:updated, мы добавляем версию к полезной нагрузке события. Нам нужно выполнить проверку в прослушивателе post:updated, чтобы убедиться, что новое событие является приращением события, которое мы обработали последним для этой записи. Получите столбец версии для этой строки, выполните приращение на 1 и, если он соответствует версии входящего события, обработайте событие и подтвердите сообщение, иначе проигнорируйте сообщение. Таким образом, это сообщение будет повторно отправлено потребителю, когда оно будет проигнорировано. Даже если у нас запущено 10 модулей Kubernetes или экземпляров EC2 для службы, сообщения всегда будут обрабатываться в соответствующем порядке. Когда получена версия события 1, нет необходимости выполнять эту проверку.

Запись событий.

Регистрация в шине событий является необходимостью, поскольку в большинстве брокеров сообщений после обработки события оно удаляется из брокера сообщений. Единственный способ определить, какая служба опубликовала определенное событие, — это ведение журнала. Мы можем регистрировать события, используя любое хранилище данных по нашему выбору. Ведение журнала очень важно для отслеживания и отладки потоков событий в нашей системе, есть также некоторые крайние случаи, которые нам необходимо учитывать при ведении журнала, чтобы избежать создания серьезной уязвимости в нашей системе. Нам потребуется создать фильтр журнала, чтобы удалить определенные конфиденциальные поля, прежде чем они будут сохранены в нашем хранилище данных, поскольку эти поля должны быть конфиденциальными между этими службами. Например, если вы публикуете событие с полем для пин-кода, пароля или одноразового пароля, мы можем написать логику для фильтрации всех этих полей, прежде чем они будут сохранены в нашем хранилище данных. Если вы хотите использовать журналы для повторной отправки ошибочных сообщений, вы можете зашифровать исходное тело и расшифровать тело, когда вам нужно повторно отправить сообщение. Вот некоторые из полей, которые необходимо зарегистрировать.

Очередь недоставленных писем.

Очереди недоставленных сообщений используются в системе, управляемой событиями, по разным причинам, наиболее распространенными из которых являются:

  1. Они служат очередью, содержащей отклоненные сообщения или сообщения, которые не могут быть обработаны в течение определенного времени. Длительность может основываться на том, сколько раз сообщение будет повторно отправлено в очередь после сбоя потребления сообщения или истечения срока жизни сообщения.
  2. Когда очередь заполнена, входящие сообщения перенаправляются в очередь недоставленных сообщений.

Использование очередей недоставленных сообщений помогает устранять новые пограничные случаи в вашей системе. Если сообщение не может быть обработано из-за определенных ошибок времени выполнения, вы можете легко обнаружить это, изучив тело сообщения, и исправить эту проблему, прежде чем пытаться отправить его повторно.

Посмотреть пример проекта на Github

Ссылки

http://www.rribbit.org/eventbus.html