Реализация неблокирующей повторной попытки с откатом с помощью spring-amqp и rabbitmq

Я ищу хороший способ реализовать повторные попытки с политикой отсрочки, используя spring amqp и Rabbit MQ, но требование состоит в том, чтобы прослушиватель не был заблокирован (чтобы можно было обрабатывать другие сообщения). Я вижу здесь аналогичный вопрос / ответ, но он не включает решение для «отступления»:

RabbitMQ и Spring amqp повторяют без блокировки потребителей

У меня есть следующие вопросы:

  1. Блокирует ли реализация spring-retry по умолчанию потоки при повторной попытке? В в github указывает, что это так.

  2. Если вышеприведенное предположение верно, это единственный способ сделать это - реализовать отдельную очередь для повторных попыток (DLQ?) И установить TTL для каждого сообщения (при условии, что мы не хотим блокировать потоки на интервал отсрочки).

  3. Если мы воспользуемся подходом, описанным выше (DLQ или отдельная очередь), не понадобятся ли нам отдельные очереди для каждой попытки повтора? Если мы используем только 1 очередь для повторных попыток, та же очередь будет содержать сообщения с TTL в диапазоне от минимального интервала повтора до максимального интервала повтора, и если сообщение в начале очереди имеет максимальный TTL, сообщение за ним не будет поднят, даже если у него минимальный TTL. Это указано в документации Rabbit MQ TTL, здесь (см. Предостережения):

  4. Есть ли другой способ реализовать неблокирующий механизм Backoff Retry?

Добавление информации о конфигурации для устранения неполадок @garyrussel:

Конфигурация очереди:

    <rabbit:queue name="regular_requests_queue"/>
    <rabbit:queue name="retry_requests_queue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="regular_exchange" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:direct-exchange name="regular_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="regular_requests_queue" key="regular-request-key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:direct-exchange name="retry_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="retry_requests_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="retryRecoverer" class="com.testretry.RetryRecoverer">
         <constructor-arg ref="retryTemplate"/>
         <constructor-arg value="retry_exchange"/>
    </bean>

    <rabbit:template id="templateWithOneRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>
    <rabbit:template id="retryTemplate" connection-factory="connectionFactory" exchange="retry_exchange"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="1"/>
            </bean>
        </property>
    </bean>

person alam86    schedule 16.09.2015    source источник


Ответы (3)


  1. да
  2. через 4 ...

Вы можете использовать максимальное количество повторных попыток = 1 с подклассом RepublishMessageRecoverer и реализовать additionalHeaders для добавления, скажем, заголовка счетчика повторных попыток.

Затем вы можете повторно публиковать в другой очереди для каждой попытки.

Средство восстановления на самом деле не структурировано для публикации в разные очереди (мы должны это изменить), поэтому вам может потребоваться написать собственное средство восстановления и делегировать его одному из нескольких RepublishMessageRecoverer.

Рассмотрите возможность внести свое решение в структуру.

person Gary Russell    schedule 18.09.2015
comment
Спасибо за ответ @gary! Я шел по тому же пути, но использовал собственный RejectAndDontRequeueRecoverer, который затем выбрасывал AmqpRejectAndDontRequeueException. Это должно было поместить сообщение в настроенный DLX, верно? Однако это не сработало. Я также пробовал использовать настраиваемый RepublishMessageRecoverer, который является лучшим вариантом, но я не вижу, чтобы сообщение «переиздавалось» на «retry_exchange». Я вижу сообщение в файле журнала: «Не удалось повторно опубликовать сообщение для обмена retry_exchange». - person alam86; 19.09.2015
comment
К вашему сведению, я также установил TTL (срок действия) на «60000» (60 с) для сообщения перед вызовом super.recover () в настраиваемом RepublishMessageRecoverer. Я вижу, что используется пользовательский RetryRecoverer (путем установки точек останова), но не вижу, чтобы сообщение было опубликовано в retry_requests_queue. См. Конфигурацию в исходном сообщении выше. - person alam86; 19.09.2015
comment
Я не вижу конфигурации для subscription_exchange. Разве вы не должны идти на retry_exchange? - person Gary Russell; 19.09.2015
comment
Извините, это должно было быть regular_exchange. Идея состоит в том, что после того, как мы установили TTL для сообщения, и оно не будет получено никаким слушателем из retry_requests_queue, после TTL оно должно быть пропущено через буквенное обозначение для 'regular_exchange' (и попытаться повторить попытку). Разве не так мы реализуем отсрочку? - person alam86; 19.09.2015
comment
Было непонятно, какую из ваших ситуаций представляет эта конфигурация; первый случай, когда RMQ не маршрутизировал DLX, как ожидалось, или когда вы повторно публиковали и сообщение не было доставлено в очередь. К тому же неправильное название биржи еще больше запутало ситуацию. Что вы используете в качестве ключа маршрутизации для повторной публикации? Ваша конфигурация ожидает использования имени очереди; по умолчанию средство восстановления использует error.<originalRK>. - person Gary Russell; 19.09.2015
comment
Сейчас я пытаюсь реализовать сценарий «Повторная публикация». Извините за путаницу с именами Exchange! Для ключа маршрутизации я оставил значение по умолчанию, поэтому это будет «error. ‹Old_key›». Не могли бы вы пояснить, что вы подразумеваете под «конфигурацией ожидает использования имени очереди»? В инициализации Republish я указал retry_exchange и отдельный retry_template. Исходное сообщение теперь также имеет конфигурацию шаблона. - person alam86; 21.09.2015
comment
У вас нет привязки к этому ключу. У вас есть <rabbit:binding queue="retry_requests_queue"/>. Вам понадобится `‹ rabbit: binding queue = retry_requests_queue key = error.regular-request-key / › - person Gary Russell; 21.09.2015
comment
Спасибо, Гэри! На самом деле я тоже пробовал с привязкой, но у меня все не получалось, потому что я не понимал, что мне нужно удалить / воссоздать очереди. Как только я удалил очереди со старой конфигурацией (и они были воссозданы с новой конфигурацией), сообщение было перенаправлено в очередь повторных попыток. Однако с этим решением может быть проблема - опишу это в следующем посте (слишком длинном для этого) - person alam86; 22.09.2015
comment
Вот проблема, о которой я упоминал в своем предыдущем посте: если ключ маршрутизации изменяется на 'error. ‹Original_key›' во время повторной публикации, когда срок действия сообщения истекает в retry_requests_queue и является мертвым буквенным обозначением исходному обмену (regular_exchange), ключ будет 'error. ‹original_key›', если я не укажу отдельный ключ для использования для сообщений dead_lettered, и не будет перенаправлен в исходную очередь. У меня может быть более одной очереди (с другим ключом) в regular_exchange, запросы которой я хочу повторить, используя этот механизм (переход к той же retry_requests_queue). - person alam86; 22.09.2015
comment
Итак, почему бы просто не установить errorRoutingKeyPrefix на "" (вместо значения по умолчанию "error."), и тогда исходный ключ маршрутизации будет использоваться повсюду. В документации javadoc явно указано Use an empty string ("") for no prefixing. - person Gary Russell; 22.09.2015
comment
Спасибо, что указали на это - я пропустил это в javadoc! Я постараюсь заставить работать полное решение, а затем выложу его здесь. Спасибо за твою помощь! - person alam86; 22.09.2015

Вы смотрели на плагин rabbitmq delayer, который задерживает сообщения при обмене вместо очереди? Согласно документации, сообщения, отправленные на обмен с задержкой, кажутся постоянными на уровне обмена.

Используя настраиваемый заголовок сообщения счетчика повторов и обмен с задержкой, мы можем добиться неблокирующего поведения без уродства этой комбинации промежуточной очереди, dlx и шаблона.

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

person Ashok Koyi    schedule 28.05.2016
comment
Это действительно похоже на жизнеспособный вариант @kalinga. Пока не пробовал - у нас версия 3.3.5. Также сейчас это звучит как «экспериментальная» реализация. Когда он станет более зрелым, это, вероятно, будет предпочтительным вариантом для этого. Приведенное выше решение действительно работает для нас с имеющейся у нас версией rabbitmq. - person alam86; 16.08.2016

Вот окончательное решение, которое я реализовал. На «интервал повтора» приходится 1 очередь, на каждую очередь повтора - 1 обмен. Все они передаются настраиваемому RepublishRecoverer, который создает список восстановителей.

К сообщению добавляется настраиваемый заголовок RetryCount, и в зависимости от значения RetryCount сообщение публикуется в правильном обмене / очереди с другим «сроком действия». Каждая очередь повторных попыток настраивается с помощью DLX, для которого установлено значение «regular_exchange» (т.е. запросы поступают в обычную очередь).

<rabbit:template id="genericTemplateWithRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>

<!-- Create as many templates as retryAttempts (1st arg) in customRetryTemplate-->
<rabbit:template id="genericRetryTemplate1" connection-factory="consumerConnFactory" exchange="retry_exchange_1"/>
<rabbit:template id="genericRetryTemplate2" connection-factory="consumerConnFactory" exchange="retry_exchange_2"/>
<rabbit:template id="genericRetryTemplate3" connection-factory="consumerConnFactory" exchange="retry_exchange_3"/>
<rabbit:template id="genericRetryTemplate4" connection-factory="consumerConnFactory" exchange="retry_exchange_4"/>
<rabbit:template id="genericRetryTemplate5" connection-factory="consumerConnFactory" exchange="retry_exchange_5"/>

<rabbit:queue name="regular_requests_queue"/>

<!-- Create as many queues as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:queue name="retry_requests_queue_1">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_2">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_3">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_4">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_5">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="regular_exchange">
    <rabbit:bindings>
        <rabbit:binding queue="regular_requests_queue" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!-- Create as many exchanges as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:direct-exchange name="retry_exchange_1">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_1" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_2">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_2" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_3">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_3" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_4">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_4" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_5">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_5" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>


<!-- retry config begin -->
<!-- Pass in all templates and exchanges created as list/array arguments below -->
<bean id="customRetryRecoverer" class="com.test.listeners.CustomRetryRecoverer">
    <!-- Pass in list of templates -->
     <constructor-arg>
        <list>
            <ref bean="genericRetryTemplate1"/>
            <ref bean="genericRetryTemplate2"/>
            <ref bean="genericRetryTemplate3"/>
            <ref bean="genericRetryTemplate4"/>
            <ref bean="genericRetryTemplate5"/>
        </list>
     </constructor-arg>
     <!-- Pass in array of exchanges -->
     <constructor-arg value="retry_exchange_1,retry_exchange_2,retry_exchange_3,retry_exchange_4,retry_exchange_5"/>
     <constructor-arg ref="customRetryTemplate"/>
</bean>

<bean id="retryInterceptor"
      class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="customRetryRecoverer"/>
    <property name="retryOperations" ref="retryTemplate"/>
    <property name="messageKeyGenerator" ref="msgKeyGenerator"/>
</bean>
    
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <!--  Set to 1 - just for the initial attempt -->
            <property name="maxAttempts" value="1"/>
        </bean>
    </property>
</bean>

 <bean id="customRetryTemplate" class="com.test.retry.CustomRetryTemplate">
    <constructor-arg value="5"/> <!-- max attempts -->
    <constructor-arg value="3000"/> <!-- Initial interval -->
    <constructor-arg value="5"/> <!-- multiplier for backoff -->
</bean>

<!-- retry config end -->

Вот код CustomRetryRecoverer:

public class CustomRetryRecoverer extends
        RepublishMessageRecoverer {

    private static final String RETRY_COUNT_HEADER_NAME = "RetryCount";
    private List<RepublishMessageRecoverer> retryExecutors = new ArrayList<RepublishMessageRecoverer>();
    private TriggersRetryTemplate retryTemplate;
    
    public TriggersRetryRecoverer(AmqpTemplate[] retryTemplates, String[] exchangeNames, TriggersRetryTemplate retryTemplate) {
        super(retryTemplates[0], exchangeNames[0]);
        this.retryTemplate = retryTemplate;

        //Get lower of the two array sizes
        int executorCount = (exchangeNames.length < retryTemplates.length) ? exchangeNames.length : retryTemplates.length;
        for(int i=0; i<executorCount; i++) {
            createRetryExecutor(retryTemplates[i], exchangeNames[i]);
        }
        //If not enough exchanges/templates provided, reuse the last exchange/template for the remaining retry recoverers
        if(retryTemplate.getMaxRetryCount() > executorCount) {
            for(int i=executorCount; i<retryTemplate.getMaxRetryCount(); i++) {
                createRetryExecutor(retryTemplates[executorCount-1], exchangeNames[executorCount-1]);
            }
        }
    }

    @Override
    public void recover(Message message, Throwable cause) {
        
        if(getRetryCount(message) < retryTemplate.getMaxRetryCount()) {
            incrementRetryCount(message);
            
            //Set the expiration of the retry message
            message.getMessageProperties().setExpiration(String.valueOf(retryTemplate.getNextRetryInterval(getRetryCount(message)).longValue()));
            
            RepublishMessageRecoverer retryRecoverer = null;
            if(getRetryCount(message) != null && getRetryCount(message) > 0) {
                retryRecoverer = retryExecutors.get(getRetryCount(message)-1);
            } else {
                retryRecoverer = retryExecutors.get(0);
            }
            retryRecoverer.recover(message, cause);
        } else {
            //Retries exchausted - do nothing
        }
    }

    private void createRetryExecutor(AmqpTemplate template, String exchangeName) {
        RepublishMessageRecoverer retryRecoverer = new RepublishMessageRecoverer(template, exchangeName);
        retryRecoverer.errorRoutingKeyPrefix(""); //Set KeyPrefix to "" so original key is reused during retries
        retryExecutors.add(retryRecoverer);
    }   

    private Integer getRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME);
        }
        
        return retryCount;
    }

    private void incrementRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME)+1;
        }
        msg.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER_NAME, retryCount);
    }

}

Код для CustomRetryTemplate здесь не размещен, но он содержит простые переменные для maxRetryCount, initialInterval и multiplier.

person alam86    schedule 20.10.2015