Припаркуйте XML-сообщение в недопустимом формате в очередь на парковку AMQP

Учитывая, что у меня есть IntegrationFlow

IntegrationFlows.from(
        Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .defaultRequeueRejected(false)
                .concurrentConsumers(2)
                .maxConcurrentConsumers(4)
                .channelTransacted(true)
                .errorHandler(new ConditionalRejectingErrorHandler())
)
        .log(INFO, AMQP_LOGGER_CATEGORY)
        .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(deathCheckHandler))
                .subscribe(f -> f.handle(service))
        )
        .get();

где deathCheckHandler

@Component
public class DeathCheckHandler {

    private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());

    private static final int RETRY_COUNT = 3;
    private final RabbitTemplate rabbitTemplate;
    private final Jaxb2Marshaller xmlMarshaller;

    public DeathCheckHandler(RabbitTemplate rabbitTemplate, Jaxb2Marshaller xmlMarshaller) {
        this.rabbitTemplate = rabbitTemplate;
        this.xmlMarshaller = xmlMarshaller;
    }

    @ServiceActivator
    public void check(Message<?> message) {
        MessageHeaders headers = message.getHeaders();

        Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
        if (rejected.isPresent()) {
            int rejectedCount = rejected.get().getCount();
            logger.debug("Rejected count is {}", rejectedCount);
            if (rejectedCount > RETRY_COUNT) {
                parkMessage(message);
            }
        }
    }

    private void parkMessage(Message<?> message) {
        Object payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        String parkingExchange = (String) headers.get("amqp_receivedExchange");
        String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue", "plq");
        rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
        logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}", RETRY_COUNT, payload, parkingExchange, parkingRoutingKey);
        rabbitTemplate.convertAndSend(parkingExchange, parkingRoutingKey, payload);
        // cause the message to be acknowledged and not routed to DLQ
        throw new ImmediateAcknowledgeAmqpException("Give up retrying message: " + payload);
    }
}

DeathCheckHandler обрабатывает мертвые буквы, настроенные для очередей AMQP.

Как я могу запарковать XML-сообщение в неправильном формате, например, когда MarshallingMessageConverter выдает UnmarshallingFailureException.

Я хочу припарковать его так же, как в DeathCheckHandler#parkMessage

Вероятно, это должно быть возможно с ConditionalRejectingErrorHandler, но я не знаю как.


person Patrik Mihalčin    schedule 17.09.2020    source источник


Ответы (1)


Клонируйте ConditionalRejectingErrorHandler.

Используйте этот метод как шаблон ...

@Override
public void handleError(Throwable t) {
    log(t);
    if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
        if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
            Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
            if (failed != null) {
                List<Map<String, ?>> xDeath = failed.getMessageProperties().getXDeathHeader();
                if (xDeath != null && xDeath.size() > 0) {
                    this.logger.error("x-death header detected on a message with a fatal exception; "
                            + "perhaps requeued from a DLQ? - discarding: " + failed);
                    throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
                }
            }
        }
        throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", this.rejectManual,
                t);
    }
}

По умолчанию фатальные исключения с заголовком x-death отбрасываются через ImmediateAcknowledgeAmqpException.

Нелегко создать подкласс и переопределить этот метод, потому что поля являются частными, поэтому было бы проще просто скопировать этот класс (и опубликовать на стоянке, прежде чем бросать IAAE).

Я внесу некоторые улучшения в этот класс, чтобы упростить настройку / переопределение.

Запрос на слияние.

person Gary Russell    schedule 17.09.2020