Как синхронно использовать необработанные байтовые сообщения из RabbitMQ с помощью EasyNetQ?

Есть ли способ синхронно использовать необработанные байтовые сообщения из RabbitMQ с помощью EasyNetQ?

Мне нужно гарантировать упорядоченную обработку и подтверждение сообщений, поступающих из системы, которая не публикуется в формате EasyNetQ. Я знаю, что потребитель работает в одном потоке, но интерфейс IAdvancedBus предлагает только один метод для использования необработанных сообщений:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

Тип возврата Task означает, что потребитель выполняет обратный вызов асинхронно и, следовательно, может обрабатывать сообщения не по порядку.

Если нет, есть идеи по изменению кода для поддержки этого? Я бы сделал способ интерфейса:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

и реализовать его в RabbitAdvancedBus, но я не уверен, куда именно пойдет код.


person Ralphie    schedule 17.11.2015    source источник


Ответы (2)


Это интересный вопрос. Я сам не эксперт по EasyNetQ, и, возможно, кто-нибудь другой придет и даст вам лучший ответ. Однако я был знаком с базой кода EasyNetQ около года, и, на мой взгляд, сложно понять, что происходит при подключении потребителя (и, следовательно, при вызове потребителя).

Прежде всего я хотел бы отметить, что простое изменение сигнатуры метода не гарантирует, что сообщения будут обрабатываться по порядку. Взгляните, например, на эту реализацию предлагаемого вами интерфейса:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

Он вызывает исходный метод Consume, и мы действительно не знаем, что происходит после этого, верно?

Если бы я был на вашем месте, я бы сделал одно из следующих действий:

  1. Используйте официальный клиент RabbitMq и используйте форму сообщений там (это не так сложно!)
  2. Может быть, взгляните на RawRabbit, тонкий слой над RabbitMq, в который я вносил свой вклад (с использованием стандартов vNext ). Он поддерживает только асинхронные подписи для получения сообщений, но не должно быть сложно написать синхронную реализацию _ 3_ (с использованием библиотеки синхронизации, например AsyncEx).
  3. Изменить моделирование бизнес-логики. Я не уверен, применимо ли это в вашем случае, но в целом, если критически важно, чтобы каждое сообщение обрабатывалось в правильном порядке, вы должны смоделировать его таким образом, чтобы метод потребления мог проверить, что это сообщение будет следующим. в линию. (кроме того, я не думаю, что EasyNetQ гарантирует последовательность сообщений, поэтому вы, вероятно, захотите проверять ее для каждой новой версии фреймворка).

Надеюсь это поможет!

person pardahlman    schedule 18.11.2015
comment
спасибо за внимание. Когда я сказал изменить сигнатуру метода, я имел в виду добавить метод в интерфейс, который бы явно указывал на то, что код выполняется синхронно (а затем реализует его в классе). Я получил ответ в группе Google, который работает (см. Мой ответ) - person Ralphie; 18.11.2015

Я получил ответ, который работает в группе EasyNetQ Google:

Для синхронного выполнения вы можете сделать это:

bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
    // do your synchronous work.....
    return Task.CompletedTask;
});

или добавьте расширение:

using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;

namespace ConsoleApplication4
{
    public static class RabbitAdvancedBusConsumeExtension
    {
       public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
    }

    public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
    }

    private static Task ExecuteSynchronously(Action action)
    {
        var tcs = new TaskCompletionSource<object>();
        try
        {
            action();
            tcs.SetResult(null);
        }
        catch (Exception e)
        {
            tcs.SetException(e);
        }
        return tcs.Task;
    }
}

class Program
{
    static void Main(string[] args)
    {
        var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));

        var queue = bus.Advanced.QueueDeclare();
        bus.Advanced.Consume(queue, (bytes, properties, info) =>
        {
            // .....
        });
    }
}
}

ОБНОВЛЕНИЕ. Эта функция была добавлена ​​в версии 0.52.0.410:

https://github.com/EasyNetQ/EasyNetQ/pull/505

person Ralphie    schedule 18.11.2015