Перемещение сообщений из очереди _error при использовании Темы служебной шины Azure

Я пытаюсь использовать темы служебной шины Azure, чтобы разрешить службам получать сообщения с помощью MassTransit. Сообщение отправляется в тему, и каждая служба, имеющая подписку на эту тему, получает копию сообщения. Каждому типу сообщения соответствует тема.

Проблема в том, как поступать с ошибками после обращения к источнику исключения. Как только сообщение выходит из строя и попадает в очередь _error, я хотел бы вернуть сообщение для обработки после исправления сообщения и / или службы. Я не могу переместить сообщение из очереди _error в тему, потому что каждая служба в этой теме снова получит сообщение.

Я попытался создать вторую очередь с помощью метода ReceiveEndpoint под названием _errorecovery, но это привело к тому, что очередь подписалась на тему, что означает, что очередь _errorrecovery получает каждое сообщение, опубликованное в этой теме.

Мне интересно, есть ли способ настроить очередь с помощью MassTransit, который будет обрабатывать только сообщения в этой очереди без добавления дополнительной подписки.

Вот мои текущие настройки для создания тем.

TEvent - это тип сообщения, а TConsumer - это связанная реализация IConsumer для этого типа сообщения.

  public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
        {
            string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
            var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));

            busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
            {
                configurator.ConfigureConsumer(container, typeof(TConsumer));
                if (!(options is null))
                {
                    ConfigureRetry(configurator, options);
                }
            });
        }

И построить очередь _errorrecovery. С каждым событием также связан IConsumer, специально предназначенный для обработки сбойных событий.

 var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);

                        busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
                        {
                            config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
                        });

Это создает очередь с именем subname_errorrecovery и тему, названную в честь события. У службы есть подписка в теме, но также есть подписка на _errorrecovery. Таким образом, каждый раз, когда сообщение отправляется в тему, получатели события и получатели ошибки получают сообщения.

Поэтому я ищу способ подключить службу к очереди восстановления, а также к нескольким темам, без подписки на каждую тему.

Также очень возможно, что я поступаю неправильно. Возможно, мне нужно просто добавить проверку на наличие повторяющихся сообщений и просто переместить сообщение в тему, позволяя службам, которые изначально успешно обработали сообщение, просто игнорировать его. Я все равно планирую это сделать, но надеюсь получить некоторые рекомендации по обработке ошибок с помощью MassTransit.

Любая помощь будет оценена по достоинству.


comment
Вам нужно использовать конечные точки подписки, или вы можете просто подключить своих потребителей к конечной точке приема и позволить MassTransit настроить подписку для пересылки на вашу конечную точку приема? Это подход по умолчанию для pub / sub с MassTransit.   -  person Chris Patterson    schedule 09.07.2020


Ответы (2)


Сервисная шина Azure позволяет настроить правила фильтрации подписок для каждой подписки, создаваемой для темы (см. https://docs.microsoft.com/en-us/azure/service-bus-messaging/topic-filters).

Таким образом, вы можете определить правила фильтрации для подписки, чтобы в очередь для обработки помещались только сообщения, соответствующие этим конкретным правилам.

Эти правила фильтрации можно настроить из кода, с помощью шаблонов ARM, портала Azure или интерфейса командной строки Azure.

Что вы можете сделать, так это убедиться, что определенное свойство сообщения добавлено к сообщению, если обработка не удалась, прежде чем оно будет перемещено в очередь _error. Я не очень хорошо знаю MassTransit, но думаю, что будет возможность это сделать. Или, возможно, сам MassTransit уже предоставляет такую ​​информацию об источнике, чтобы вы могли определить по заданному свойству сообщения, в каком источнике подписки (очереди) сообщение было обработано до того, как оно было перемещено в очередь _error.

Итак, давайте рассмотрим, когда ваше неудавшееся сообщение (возможно, оно не удалось из-за исключения из-за временной недоступности базы данных службы потребителей) перемещается в очередь _error с некоторым свойством сообщения, указывающим, где оно было обработано. Назовем это свойство ErrorOrigin, и вы дадите ему уникальное значение, идентифицирующее определенную подписку.

Итак, в вашем случае вы можете определить правило фильтрации для каждой подписки, чтобы только сообщения, у которых либо не было свойства с именем ErrorOrigin, либо, если свойство установлено, его значение должно соответствовать имя идентификатора, соответствующее этой подписке (например, subscriptionXforEventZ)

Если вы правильно настроили правила фильтрации подписки, потребители подписки теперь будут обрабатывать все сообщения, изначально опубликованные в теме, а также сообщения из очереди _error, которые также были опубликованы в этой теме. Но с помощью правила фильтрации служебная шина Azure будет помещать эти неудачные сообщения в соответствующую очередь подписки только в том случае, если свойство ErrorOrigin сообщения соответствует этой подписке.

Я успешно использую эту функцию на практике в архитектуре микросервисов. Это позволяет очень легко добавлять дополнительных подписчиков к теме, но дает этим подписчикам возможность получать только сообщения из интересующей темы.

Здесь вы можете увидеть пример фильтрации подписок в служебной шине Azure, чтобы лучше понять, как это работает: https://github.com/Azure/azure-service-bus/tree/master/samples/DotNet/Microsoft.ServiceBus.Messaging/TopicFilters

Надеюсь, эта идея поможет вам решить вашу конкретную проблему.

person afh    schedule 11.07.2020

Готовое решение этой проблемы - добавить правило подписки на тему. Таким образом, всякий раз, когда вы хотите повторно отправить сообщения, они войдут только в эти подписки на основе фильтра, примененного к подписке. Узнайте о правиле подписки на темы в статье, здесь.

person Nadeem Duke    schedule 11.07.2020