masstransit - потребители не зарегистрированы и не активированы

Я пытаюсь зарегистрировать потребителей, но безуспешно пользуется общественным транспортом. Я зарегистрировал MT с помощью Autofac, используя модульный подход.

Во-первых, я написал простое сообщение:

public class SimpleMessage
{
    public string msg { get; set; } 
}

и мне удалось отправить их в очередь:

var endpointTest = await _busControl.GetSendEndpoint(new Uri("queue:queueTest"));
await endpointTest.Send(new SimpleMessage
{
    msg = "test"
});

Затем я создал потребителя:

public class SimpleMessageConsumer : IConsumer<SimpleMessage>
{
    private readonly ILogger _logger;
    public SimpleMessageConsumer(ILogger logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<SimpleMessage> context)
    {
        _logger.Info($"got msg from queue: {context.Message}");
    }
}

Но он не запустится, когда сообщение появилось в очереди. Моя конфигурация:

public class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<BusSettings>().As<IBusSettings>();

        builder.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SimpleMessageConsumer, SimpleMessageConsumerDefinition>();

            cfg.Builder.Register(context =>
            {
                var busSettings = context.Resolve<IBusSettings>();
                var logger = context.Resolve < ILogger >();
                var busControl = Bus.Factory.CreateUsingRabbitMq(bus =>
                {
                    bus.AutoDelete = busSettings.AutoDelete;
                    bus.Durable = busSettings.Durable;
                    bus.Exclusive = busSettings.Exclusive;
                    bus.ExchangeType = busSettings.Type;

                    //bus.UseNServiceBusJsonSerializer();

                    bus.Host(busSettings.HostAddress, busSettings.Port, busSettings.VirtualHost, null, h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                    bus.ReceiveEndpoint("queueTest", ec =>
                    {
                        ec.Consumer(() => new SimpleMessageConsumer(logger));
                    });
                });

                return busControl;

            }).SingleInstance().As<IBusControl>().As<IBus>();
        });
    }
}

в program.cs у меня есть:

services.AddMassTransitHostedService();

а также

containerBuilder.RegisterModule<BusModule>();

Я уже упоминал об этом - отправка сообщения в очередь работает, но потребитель не работает.

Вы можете мне помочь, что я сделал не так? как мне исправить конфигурацию? чтобы активировать потребителя?


person Bartosz Kowalczyk    schedule 02.02.2021    source источник
comment
Что значит не работает? Вы смотрели пользовательский интерфейс управления RMQ и видели обмен для своей конечной точки приема? Если он есть, есть ли у него потребители? Я лично понятия не имею, зачем использовать Autofac для нового приложения .NET, поэтому регистрация может быть запутана. Служба, размещенная на MassTransit, ожидает, что IBus и другие вещи будут зарегистрированы в контейнере Microsoft DI, иначе это не сработает.   -  person Alexey Zimarev    schedule 02.02.2021
comment
Это старое приложение, которое несколько дней назад было перенесено на ядро. (на самом деле, некоторые компоненты (микросервисы) все еще работают над .net framework). Я вижу обмен, но потребитель не запускается (он не ловит сообщение и не обрабатывает его), поэтому похоже, что потребитель не подключился к очереди.   -  person Bartosz Kowalczyk    schedule 02.02.2021


Ответы (1)


Я обновил вашу конфигурацию, чтобы она работала правильно, используя фактические методы конфигурации шины вместо смешивания двух решений:

public class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<BusSettings>().As<IBusSettings>();

        builder.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SimpleMessageConsumer, SimpleMessageConsumerDefinition>();

            cfg.UsingRabbitMq((context,cfg) =>
            {
                var busSettings = context.GetRequiredService<IBusSettings>();
                var logger = context.GetRequiredService<ILogger>();

                //bus.UseNServiceBusJsonSerializer();

                bus.Host(busSettings.HostAddress, busSettings.Port, busSettings.VirtualHost, null, h =>
                {
                    h.Username(busSettings.Username);
                    h.Password(busSettings.Password);
                });

                bus.ReceiveEndpoint("queueTest", ec =>
                {
                    // i'm guessing these apply to the receive endpoint, not the bus endpoint

                    ec.AutoDelete = busSettings.AutoDelete;
                    ec.Durable = busSettings.Durable;
                    ec.Exclusive = busSettings.Exclusive;
                    ec.ExchangeType = busSettings.Type;

                    ec.ConfigureConsumer<SimpleMessageConsumer>(context);
                });
            });
        });
    }
}
person Chris Patterson    schedule 02.02.2021
comment
это, вероятно, работает. (вероятно, потому что я получил исключение от autofac и пытаюсь выяснить это): --- ›Исключение было создано при вызове конструктора 'Void .ctor (Extensions.DependencyInjection.IServiceScopeFactory, Extensions.Options.IOptions1[Diagnostics. HealthChecks.HealthCheckServiceOptions], Extensions.Logging.ILogger1 [Diagnostics.HealthChecks . DefaultHealthCheckService]) »для типа« DefaultHealthCheckService ». --- ›System.ArgumentException: были зарегистрированы повторяющиеся проверки работоспособности с именем (именами): masstransit-bus (параметр 'registrations') - person Bartosz Kowalczyk; 03.02.2021
comment
Не обращайте на это внимания, пожалуйста. все работает. Слишком много часов на изучение конфигурации, и я не заметил своей очевидной ошибки - person Bartosz Kowalczyk; 03.02.2021