Обработка ошибок в SpringXD

Я настроил Redis как MessageBus для своей установки spring-xd. Когда мой поток терпит неудачу, данные помещаются в очереди ошибок. Я пытаюсь прочитать их и вернуть обратно в очереди назначения. Но я не вижу, чтобы мои модули Sink получали данные. Может кто-нибудь помочь мне понять, где я ошибаюсь.

Фрагмент кода.

public RedisTemplate<String, byte[]> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  final RedisTemplate<String, byte[]> template = new RedisTemplate<String, byte[]>();
  template.setConnectionFactory(redisConnectionFactory);
  template.setKeySerializer(new StringRedisSerializer());
  template.setEnableDefaultSerializer(false);
  return template;
}

List<String> listOfKeys = new ArrayList<>();
Set<byte[]> keys = redisTemplate.getConnectionFactory().getConnection().keys("ERRORS*".getBytes());
for (byte[] data : keys) {
  listOfKeys.add(new String(data, 0, data.length));
}
for (String errorQueue : listOfKeys) {
    String destinationQueue = errorQueue.replace("ERRORS:", EMPTY_STRING);
    Long size = redisTemplate.opsForList().size(errorQueue);
    for (int i = 0; i < size; i++) {
        byte[] errorEvt = redisTemplate.opsForList().rightPop(errorQueue);
        redisTemplate.opsForList().leftPush(destinationQueue, errorEvt);
    }
}

person Jay    schedule 19.05.2016    source источник


Ответы (1)


Беглый взгляд на ваш код говорит, что он должен работать нормально; Я предлагаю вам запустить redis-cli, а затем ввести monitor, чтобы посмотреть трафик Redis.

ИЗМЕНИТЬ

Чтобы ответить на ваш комментарий ниже - это зависит от вашего типа контента. Если это простой текст, это относительно легко:

"\xff\x01\x0bcontentType\x00\x00\x00\x0c\"text/plain\"2016-05-20 10:11:07"

Первая часть — это заголовки, вы можете расшифровать их с помощью XD ​​EmbeddedHeadersMessageConverter.

Если ваша полезная нагрузка представляет собой объект Java, она сериализуется шиной с использованием kryo.

person Gary Russell    schedule 19.05.2016
comment
Спасибо @Gary. Я нашел проблему. В моей очереди назначения отсутствует префиксная очередь. Она должна быть targetQueue = queue. + errorQueue.replace(ОШИБКИ:, EMPTY_STRING); Не могли бы вы помочь мне, как преобразовать errorEvt (byte[] errorEvt = redisTemplate.opsForList().rightPop(errorQueue);) в мою реальную полезную нагрузку, я хотел бы зарегистрировать сообщение для целей тестирования. - person Jay; 20.05.2016