Триггерная очередь Azure не удаляется

Согласно документации здесь и здесь, многочисленные вопросы SO, такие как этот, насколько я понимаю, когда сообщение в очереди терпит неудачу заданное количество раз, в данном случае 5, оно автоматически перемещается из текущей очереди в опасную очередь.

К сожалению, мой ограниченный опыт показал, что это верно лишь частично, поскольку, когда задание не достигает максимального числа исключений из очереди, оно автоматически добавляется в опасную очередь, но не удаляется из исходной очереди, а затем повторно обрабатывается, по-видимому, неизменным, 10 минут. позже, добавив то же сообщение в очередь подозрений, создав дубликаты, но не удалив его.

Когда я реализовал свой собственный класс IQueueProcessorFactory, создал собственный QueueProcessor при перезаписи DeleteMessageAsync, я смог подтвердить, что метод вызывается, когда исключение генерируется 5 раз, и метод завершается без исключений, но сообщение в очереди остается. Я также попытался удалить как обычные, так и опасные очереди.

Код, который я использую:

public class Program
{
    private const string QUEUE_NAME = "some-queue";

    // Please set the following connection strings in app.config for this WebJob to run:
    // AzureWebJobsDashboard and AzureWebJobsStorage
    static void Main()
    {
        var config = new JobHostConfiguration();

        config.Queues.QueueProcessorFactory = new CustomFactory();
        var host = new JobHost(config);
        // The following code ensures that the WebJob will be running continuously
        host.RunAndBlock();
    }

    private class CustomFactory : IQueueProcessorFactory
    {
        public QueueProcessor Create(QueueProcessorFactoryContext context)
        {
            return new CustomQueueProcessor(context);
        }

        private class CustomQueueProcessor : QueueProcessor
        {
            public CustomQueueProcessor(QueueProcessorFactoryContext context) : base(context)
            {

            }

            protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
            {
                return base.DeleteMessageAsync(message, cancellationToken);
            }
        }
    }

    public static void QueueTrigger([QueueTrigger(QUEUE_NAME)] CloudQueueMessage message)
    {
        Console.WriteLine($"Processing message: {message.AsString}");
        throw new Exception("test exception");
    }
}

Все работает, как ожидалось, за исключением того, что сообщение остается в исходной очереди. Я предполагаю и надеюсь, что ошибка на моем конце и или что это что-то глупое, что я просто упустил из виду, потому что я новичок в очереди, но после почти двух дней, потраченных на поиск информации в Интернете, я официально в растерянности, поскольку что делать или попробовать дальше.

Изменить

Хотя в конечном итоге мы перешли на служебную шину, стоит отметить, что мы придумали альтернативу, которая заключалась в том, чтобы частично управлять очередью самостоятельно из триггера очереди.

Это повлекло за собой проверку количества исключений из очереди, и если оно превышает максимальное количество исключений из очереди (повторных попыток), просто вернитесь. Это будет сигналом вызывающей стороне о том, что сообщение "успешно" обработано, после чего оно будет удалено из очереди. Такой подход приведет к почти ожидаемому поведению: сообщение будет добавлено в подозрительную очередь, а через 10 минут будет удалено из обычной очереди.

У него есть дополнительное преимущество, заключающееся в продолжении работы с будущими выпусками пакетов или обновлениями самих очередей, которые исправят исходную проблему, поскольку if просто никогда не будет правдой.

public class Program
{
    private const int MAX_DEQUEUE_COUNT = 5;

    static void Main()
    {
        var config = new JobHostConfiguration();
        ...
        config.Queues.MaxDequeueCount = MAX_DEQUEUE_COUNT;
        ...
    }

    public static void QueueTrigger([QueueTrigger("some-queue")] CloudQueueMessage message)
    {
        if (message.DequeueCount > MAX_DEQUEUE_COUNT)
        {
            // prevents the message from indefinitely retrying every 10 minutes and ultimately creating duplicates within the poison queue.
            return;
        }

        // do stuff
    }

person Dan    schedule 23.02.2017    source источник


Ответы (2)


Я собираюсь дать вам ответ без ссылок, основанный на личном опыте и на том, что я видел в StackOverflow. Вы не первый человек, у которого возникают проблемы с автоматическим отправлением сообщений и соблюдением максимального числа исключений из очереди в отношении атрибутов WebJob QueueTriggerAttributes. Я рекомендую отказаться от ненадежности Storage Queues + QueueTriggers в пользу использования очередей служебной шины и триггеров служебной шины.

Как технология обмена сообщениями очереди служебной шины гораздо более полнофункциональны и сопоставимы по стоимости. Единственная реальная причина, по которой я предпочел бы использовать очереди хранилища вместо очередей служебной шины, - это необходимость хранить более 80 ГБ сообщений, что является пределом очереди служебной шины с секционированием.

person Rob Reagan    schedule 24.02.2017

Я столкнулся с таким же поведением, и это было вызвано ошибкой между sdk webjobs (v2) и клиентской библиотекой хранилища v8.x.

Это должно быть исправлено, поскольку 2.1.0-beta1 -10851. Обратной стороной является то, что в настоящее время нет стабильной версии 2.1.0.

person fgbaezp    schedule 12.01.2018