Я использую spring amqp в своем проекте, и я использую реализации ChannelAwareMessageListener для повторной отправки и обработки исключения, чтобы сделать слушателя кролика более стабильным:
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
@Autowired
private Jackson2JsonMessageConverter messageConverter;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/** where comsumer really do biz */
public abstract void receiveMessage(Message message, MessageConverter messageConverter);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
Long deliveryTag = messageProperties.getDeliveryTag();
Long consumerCount = redisTemplate.opsForHash().increment(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
messageProperties.getMessageId(), 1);
try {
receiveMessage(message, messageConverter);
channel.basicAck(deliveryTag, false);
redisTemplate.opsForHash().delete(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
messageProperties.getMessageId());
} catch (Exception e) {
if (consumerCount >= MQConstants.MAX_CONSUMER_COUNT) {
channel.basicReject(deliveryTag, false);
} else {
Thread.sleep((long) (Math.pow(MQConstants.BASE_NUM, consumerCount)*1000));
channel.basicNack(deliveryTag, false, true);
}
}
}
тогда мы можем получить, расширив наш AbstractMessageListener следующим образом:
public class BizMessageListener extends AbstractMessageListener {
Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void receiveMessage(Message message, MessageConverter messageConverter) {
//do our own biz
}
}
но однажды мой босс сказал, что это слишком. Вторжение, вы вместо этого используете аннотацию ,, поэтому я нашел что-то вроде этого: Spring RabbitMQ - использование ручного подтверждения канала в службе с конфигурацией @RabbitListener
где я могу использовать аннотацию как
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
но как я могу инкапсулировать @RabbitListener на высоком уровне, чтобы объединить мой собственный код повторной отправки сообщения в моем первом образце кода, например, есть аннотация как RabbitResenderListener
@RabbitResenderListener(queues = "so38728668")
public void receive(Message msg)
throws IOException {
// just do biz
}
эта аннотация дает методу возможность повторной отправки сообщения и возможности обработки ошибок, так что метод выполняет только biz. спасибо