Согласно документации здесь и здесь, многочисленные вопросы SO, такие как этот, насколько я понимаю, когда сообщение в очереди терпит неудачу заданное количество раз, в данном случае 5, оно автоматически перемещается из текущей очереди в опасную очередь.
К сожалению, мой ограниченный опыт показал, что это верно лишь частично, поскольку, когда задание не достигает максимального числа исключений из очереди, оно автоматически добавляется в опасную очередь, но не удаляется из исходной очереди, а затем повторно обрабатывается, по-видимому, неизменным, 10 минут. позже, добавив то же сообщение в очередь подозрений, создав дубликаты, но не удалив его.
Когда я реализовал свой собственный класс IQueueProcessorFactory
, создал собственный QueueProcessor
при перезаписи DeleteMessageAsync
, я смог подтвердить, что метод вызывается, когда исключение генерируется 5 раз, и метод завершается без исключений, но сообщение в очереди остается. Я также попытался удалить как обычные, так и опасные очереди.
Код, который я использую:
public class Program
{
private const string QUEUE_NAME = "some-queue";
// Please set the following connection strings in app.config for this WebJob to run:
// AzureWebJobsDashboard and AzureWebJobsStorage
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.QueueProcessorFactory = new CustomFactory();
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
private class CustomFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
return new CustomQueueProcessor(context);
}
private class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context) : base(context)
{
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
}
}
public static void QueueTrigger([QueueTrigger(QUEUE_NAME)] CloudQueueMessage message)
{
Console.WriteLine($"Processing message: {message.AsString}");
throw new Exception("test exception");
}
}
Все работает, как ожидалось, за исключением того, что сообщение остается в исходной очереди. Я предполагаю и надеюсь, что ошибка на моем конце и или что это что-то глупое, что я просто упустил из виду, потому что я новичок в очереди, но после почти двух дней, потраченных на поиск информации в Интернете, я официально в растерянности, поскольку что делать или попробовать дальше.
Изменить
Хотя в конечном итоге мы перешли на служебную шину, стоит отметить, что мы придумали альтернативу, которая заключалась в том, чтобы частично управлять очередью самостоятельно из триггера очереди.
Это повлекло за собой проверку количества исключений из очереди, и если оно превышает максимальное количество исключений из очереди (повторных попыток), просто вернитесь. Это будет сигналом вызывающей стороне о том, что сообщение "успешно" обработано, после чего оно будет удалено из очереди. Такой подход приведет к почти ожидаемому поведению: сообщение будет добавлено в подозрительную очередь, а через 10 минут будет удалено из обычной очереди.
У него есть дополнительное преимущество, заключающееся в продолжении работы с будущими выпусками пакетов или обновлениями самих очередей, которые исправят исходную проблему, поскольку if просто никогда не будет правдой.
public class Program
{
private const int MAX_DEQUEUE_COUNT = 5;
static void Main()
{
var config = new JobHostConfiguration();
...
config.Queues.MaxDequeueCount = MAX_DEQUEUE_COUNT;
...
}
public static void QueueTrigger([QueueTrigger("some-queue")] CloudQueueMessage message)
{
if (message.DequeueCount > MAX_DEQUEUE_COUNT)
{
// prevents the message from indefinitely retrying every 10 minutes and ultimately creating duplicates within the poison queue.
return;
}
// do stuff
}