Публикация сообщения с использованием ключа обмена и маршрутизации с помощью MassTransit

Я смотрю на MassTransit уже пару недель, и мне интересно узнать о возможностях. Однако, похоже, я не могу правильно понять концепции.

Ожидаемое поведение. Я хотел опубликовать сообщение для «прямого» обмена с ключом маршрутизации, который привязан к двум разным очередям для выполнения других действий.

Когда я попытался использовать ту же логику с помощью MassTransit для лучшей масштабируемости. Я обнаружил, что MassTransit создает собственный обмен на основе имени очереди с типом разветвления.

Классический код для публикации сообщения по обмену и ключу маршрутизации

using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct");

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange, routingKey, null, body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
        }

Есть ли способ настроить прямой или тематический обмен с помощью routingkey в MassTransit?


person Syed    schedule 10.05.2015    source источник


Ответы (3)


Этот сценарий не поддерживается MassTransit. MassTransit всегда будет создавать очередь разветвления. Если вы сами управляли своей топологией, вы можете использовать IEndpoint.Send для прямой отправки сообщения на созданный вами обмен. В этом случае вы отказываетесь от многого из того, что предлагает MT.

Я также не уверен, что в данном случае означает «лучшая масштабируемость». Обмены Fanout работают лучше, чем прямые обмены (в большинстве случаев), поскольку нет логики маршрутизации, которую нужно обрабатывать.

Возможно, если вы проясните свои проблемы с производительностью в списке рассылки MassTransit, мы сможем помочь вы там немного больше.

person Travis    schedule 10.05.2015
comment
Спасибо за ваши комментарии. Моему процессу потребуется несколько издателей для параллельной публикации сообщений, поэтому подумайте о прямом или тематическом обмене, но они не поддерживаются в MassTransit. Есть ли у вас какие-либо предложения, как лучше всего справиться с этим, или вы предпочитаете использовать несколько очередей для этого процесса. Кроме того, было бы полезно, если бы вы направили меня к архитектуре MassTransit. - person Syed; 10.05.2015
comment
Я не совсем уверен, что вы пытаетесь сделать. Для меня не имеет смысла публиковать сообщения параллельно. С MT 3.0 вы можете выполнять асинхронные публикации. Хотя с обменом сообщениями все более-менее параллельно. Если один поток публикует, то может и другой. То же самое и с потреблением. - person Travis; 11.05.2015
comment
По сути, забудьте о топологии в RMQ и просто определите контракты сообщений с типами (желательно интерфейсами) и позвольте MT связать все это вместе за вас. - person Travis; 11.05.2015

Следующий код выполняет ту же работу, но с одним дополнительным обменом разветвлением:

TestMessage (прямой обмен) -> TestMessage_Queue (разветвленный обмен) -> TestMessage_Queue (очередь)

var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });
    cfg.Send<TestMessage>(x => { x.UseRoutingKeyFormatter(context => "routingKey"); });
    cfg.Message<TestMessage>(x => x.SetEntityName("TestMessage"));
    cfg.Publish<TestMessage>(x => { x.ExchangeType = ExchangeType.Direct; });
    cfg.ReceiveEndpoint(host, "TestMessage_Queue", e =>
    {
        e.BindMessageExchanges = false;
        e.Consumer<UpdateCustomerConsumer>();
        e.Bind("TestMessage", x =>
        {
            x.ExchangeType = ExchangeType.Direct;
            x.RoutingKey = "routingKey";
        });
    });
});

bus.Start();
person Jan Muncinsky    schedule 09.12.2018
comment
какая это версия? версия 5.5.1 не имеет Отправить ‹T› или Опубликовать ‹T› - person Andre; 07.06.2019

MassTransit обрабатывает публикацию и подписку без использования тематических обменов, вместо этого создавая обмены для публикуемых типов сообщений и привязывая эти обмены к очередям потребителей.

Подход с использованием ключа маршрутизации менее эффективен с RabbitMQ, который предпочитает использовать структуру обмена для упрощения маршрутизации сообщений (без поддержки хэш-таблиц).

Вместо того, чтобы иметь дело с обменами и ключами маршрутизации, просто определите свои команды и / или типы событий, отправьте или опубликуйте эти сообщения и позвольте потребителям выполнять свою работу.

person Chris Patterson    schedule 19.05.2015
comment
Создание дополнительных типов, хотя и очень чистое, может быть утомительным. Заманчивый подход состоит в том, что ваша шина даже не знает, какие именно типы обмениваются, потому что вы оборачиваете все в общий тип, подобный оболочке (возможно, с потерей статической типизации, поскольку существует только один тип оболочки). Однако недостатком одного типа, похожего на оболочку, является то, что технически существует поток от одного обмена ко всем подписчикам, и, таким образом, все подписчики извлекают все сообщения, фильтрация должна выполняться внутри, в обработчиках. - person Wiktor Zychla; 20.02.2019
comment
Именно здесь становится удобнее использовать ключ маршрутизации. Имея явный ключ для каждого обернутого сообщения и ключи маршрутизации, сопоставленные с очередями, вы можете точно управлять подписками. К счастью, это, похоже, работает в MT, как упоминалось в другом ответе @ jan-mucinsky. Я не защищаю эту модель, я только говорю, что это технически возможно, и за этим есть обоснование. - person Wiktor Zychla; 20.02.2019