Как избежать многократного получения сообщений из очереди ServcieBus при использовании пакета SDK для веб-заданий

У меня есть WebJob со следующим обработчиком ServiceBus, использующим SDK WebJobs:

[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }
    }
}

Если предварительные условия для обработчика не выполнены, outputMessages содержит BrokeredMessage с теми же MessageId, Label и полезной нагрузкой, что и тот, который мы обрабатываем в настоящее время, но содержит ScheduledEnqueueTimeUtc в будущем.

Идея состоит в том, что мы быстро завершаем обработку текущего сообщения и ждем повторной попытки, планируя новое сообщение в будущем.

Иногда, особенно когда сообщений в очереди больше, чем блокировок просмотра SDK, я вижу дублирование сообщений в очереди ServiceBus. У них одинаковые MessageId, Label и полезная нагрузка, но разные SequenceNumber, EnqueuedTimeUtc и ScheduledEnqueueTimeUtc. Все они имеют количество доставок 1.

Глядя на мой код обработчика, единственный способ, которым это может произойти, - это если я получил одно и то же сообщение несколько раз, понял, что мне нужно подождать, и создать новое сообщение для обработки в будущем. Обработчик завершается успешно, поэтому исходное сообщение завершается.

Начальные сообщения уникальны. Также я помещаю SingletonAttribute в обработчик сообщений, чтобы сообщения для одного и того же MessageId не могли использоваться разными обработчиками.

Почему несколько обработчиков запускаются с одним и тем же сообщением и как этого избежать?

Я использую версию Microsoft.Azure.WebJobs v2.1.0.

Продолжительность моих обработчиков составляет максимум 17 с, а в среднем 1 с. Длительность блокировки 1м. Тем не менее, моя лучшая теория заключается в том, что что-то с (повторной) блокировкой сообщения не работает, поэтому, пока я обрабатываю обработчик, блокировка теряется, сообщение возвращается в очередь и потребляется в другой раз. Если оба обработчика увидят, что критический ресурс все еще занят, они оба поставят в очередь новое сообщение.


person D. Siemer    schedule 07.05.2018    source источник
comment
Что делает ваш обработчик сообщений? извините, но то, как вы это делаете, звучит странно....   -  person Thomas    schedule 07.05.2018
comment
У меня есть несколько разных обработчиков сообщений, работающих с этим шаблоном. Я не могу раскрыть, чем именно они занимаются. Как вы думаете, что именно звучит проводно?   -  person D. Siemer    schedule 07.05.2018
comment
Вы получаете сообщение, а затем ставите в очередь новое в будущем. Не очень понимаю, почему. не могли бы вы обрабатывать сообщения по мере их поступления?   -  person Thomas    schedule 08.05.2018
comment
Я ставлю сообщение в очередь на обработку в будущем, если предварительные условия обработчика не выполнены. Допустим, у меня есть два объекта в моей системе, которые должны быть завершены, прежде чем операция, которую я запускаю с сообщением, может начаться, мне нужно проверить состояние этих предварительных условий, и если они не завершены, мне нужно немного подождать, пока я можно начинать обработку.   -  person D. Siemer    schedule 08.05.2018
comment
О, хорошо, теперь это понятно :-) Может быть, вам следует обрабатывать только одно сообщение за раз?   -  person Thomas    schedule 08.05.2018
comment
Как вы можете видеть в приведенном выше коде, я установил SingletonAttribute для обработки одного сообщения для каждого идентификатора сообщения за раз. Проблема, которую я описал в последнем абзаце, возникнет даже тогда.   -  person D. Siemer    schedule 08.05.2018


Ответы (1)


Немного поэкспериментировав, я выяснил основную причину и нашел обходной путь.

Если сообщение ServiceBus завершено, но блокировка просмотра не отменена, оно вернется в очередь в активном состоянии после истечения срока действия блокировки.

ServiceBus QueueClient, по-видимому, снимает блокировку, как только получает следующее сообщение (или пакет сообщений).

Таким образом, если QueueClient, используемый SDK веб-заданий, неожиданно завершает работу (например, из-за завершения процесса или перезапуска веб-приложения), все заблокированные сообщения снова появляются в очереди, даже если они были завершены.

В моем обработчике я теперь завершаю сообщение вручную, а также отказываюсь от блокировки следующим образом:

public static async Task ProcessQueueMessageAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }

        await message.CompleteAsync().ConfigureAwait(false);
        await message.AbandonAsync().ConfigureAwait(false);
    }
}

Таким образом, я не возвращаю сообщения в очередь в сценарии перезагрузки.

person D. Siemer    schedule 14.05.2018