Сообщение BrokeredMessage удаляется после доступа из другого потока

Это может быть дубликат этого вопроса, но это сбивает с толку разговоры о пакетной базе данных обновляется и до сих пор не получил надлежащего ответа.

В простом примере с использованием очередей служебной шины Azure я не могу получить доступ к BrokeredMessage после того, как оно было помещено в очередь; он всегда удаляется, если я читаю очередь из другого потока.

Образец кода:

class Program {
    private static string _serviceBusConnectionString = "XXX";

    private static BlockingCollection<BrokeredMessage> _incomingMessages = new BlockingCollection<BrokeredMessage>();
    private static CancellationTokenSource _cancelToken = new CancellationTokenSource();

    private static QueueClient _client;

    static void Main(string[] args) {

        // Set up a few listeners on different threads
        Task.Run(async () => {
            while (!_cancelToken.IsCancellationRequested) {
                var msg = _incomingMessages.Take(_cancelToken.Token);
                if (msg != null) {
                    try {
                        await msg.CompleteAsync();
                        Console.WriteLine($"Completed Message Id: {msg.MessageId}");
                    } catch (ObjectDisposedException) {
                        Console.WriteLine("Message was disposed!?");
                    }
                }
            }
        });


        // Now set up our service bus reader
        _client = GetQueueClient("test");

        _client.OnMessageAsync(async (message) => {
            await Task.Run(() => _incomingMessages.Add(message));
        },
        new OnMessageOptions() {
            AutoComplete = false
        });

        // Now start sending
        Task.Run(async () => {
            int sent = 0;
            while (!_cancelToken.IsCancellationRequested) {
                var msg = new BrokeredMessage();
                await _client.SendAsync(msg);
                Console.WriteLine($"Sent {++sent}");
                await Task.Delay(1000);
            }
        });

        Console.ReadKey();
        _cancelToken.Cancel();

    }

    private static QueueClient GetQueueClient(string queueName) {

        var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString);
        if (!namespaceManager.QueueExists(queueName)) {
            var settings = new QueueDescription(queueName);
            settings.MaxDeliveryCount = 10;
            settings.LockDuration = TimeSpan.FromSeconds(5);
            settings.EnableExpress = true;
            settings.EnablePartitioning = true;
            namespaceManager.CreateQueue(settings);
        }

        var factory = MessagingFactory.CreateFromConnectionString(_serviceBusConnectionString);
        factory.RetryPolicy = new RetryExponential(minBackoff: TimeSpan.FromSeconds(0.1), maxBackoff: TimeSpan.FromSeconds(30), maxRetryCount: 100);
        var queueClient = factory.CreateQueueClient(queueName);

        return queueClient;
    }
}

Я пробовал поиграться с настройками, но не могу заставить это работать. Любые идеи?


person Jono Rogers    schedule 22.11.2016    source источник


Ответы (1)


Отвечая на свой вопрос с ответом от Serkant Karaca @ Microsoft здесь:

Очень простое правило, и я не уверен, что это задокументировано. Полученное сообщение необходимо обработать в течение времени жизни функции обратного вызова. В вашем случае сообщения будут удаляться после завершения асинхронного обратного вызова, поэтому ваши полные попытки терпят неудачу с ObjectDisposedException в другом потоке.

Я действительно не понимаю, как постановка сообщений в очередь для дальнейшей обработки влияет на пропускную способность. Это наверняка добавит нагрузке клиенту. Попробуйте обработать сообщение в асинхронном обратном вызове, это должно быть достаточно производительным.

Баггер.

person Jono Rogers    schedule 23.11.2016