У меня есть WebJob со следующим обработчиком ServiceBus, использующим SDK WebJobs:
[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
using (var scope = Program.Container.BeginLifetimeScope())
{
var handler = scope.Resolve<MessageHandlers>();
logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));
// To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);
foreach (var outputMessage in outputMessages)
{
queue.Add(outputMessage);
}
}
}
Если предварительные условия для обработчика не выполнены, outputMessages
содержит BrokeredMessage
с теми же MessageId
, Label
и полезной нагрузкой, что и тот, который мы обрабатываем в настоящее время, но содержит ScheduledEnqueueTimeUtc
в будущем.
Идея состоит в том, что мы быстро завершаем обработку текущего сообщения и ждем повторной попытки, планируя новое сообщение в будущем.
Иногда, особенно когда сообщений в очереди больше, чем блокировок просмотра SDK, я вижу дублирование сообщений в очереди ServiceBus. У них одинаковые MessageId
, Label
и полезная нагрузка, но разные SequenceNumber
, EnqueuedTimeUtc
и ScheduledEnqueueTimeUtc
. Все они имеют количество доставок 1.
Глядя на мой код обработчика, единственный способ, которым это может произойти, - это если я получил одно и то же сообщение несколько раз, понял, что мне нужно подождать, и создать новое сообщение для обработки в будущем. Обработчик завершается успешно, поэтому исходное сообщение завершается.
Начальные сообщения уникальны. Также я помещаю SingletonAttribute
в обработчик сообщений, чтобы сообщения для одного и того же MessageId
не могли использоваться разными обработчиками.
Почему несколько обработчиков запускаются с одним и тем же сообщением и как этого избежать?
Я использую версию Microsoft.Azure.WebJobs
v2.1.0.
Продолжительность моих обработчиков составляет максимум 17 с, а в среднем 1 с. Длительность блокировки 1м. Тем не менее, моя лучшая теория заключается в том, что что-то с (повторной) блокировкой сообщения не работает, поэтому, пока я обрабатываю обработчик, блокировка теряется, сообщение возвращается в очередь и потребляется в другой раз. Если оба обработчика увидят, что критический ресурс все еще занят, они оба поставят в очередь новое сообщение.