Я настроил Redis как MessageBus для своей установки spring-xd. Когда мой поток терпит неудачу, данные помещаются в очереди ошибок. Я пытаюсь прочитать их и вернуть обратно в очереди назначения. Но я не вижу, чтобы мои модули Sink получали данные. Может кто-нибудь помочь мне понять, где я ошибаюсь.
Фрагмент кода.
public RedisTemplate<String, byte[]> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
final RedisTemplate<String, byte[]> template = new RedisTemplate<String, byte[]>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setEnableDefaultSerializer(false);
return template;
}
List<String> listOfKeys = new ArrayList<>();
Set<byte[]> keys = redisTemplate.getConnectionFactory().getConnection().keys("ERRORS*".getBytes());
for (byte[] data : keys) {
listOfKeys.add(new String(data, 0, data.length));
}
for (String errorQueue : listOfKeys) {
String destinationQueue = errorQueue.replace("ERRORS:", EMPTY_STRING);
Long size = redisTemplate.opsForList().size(errorQueue);
for (int i = 0; i < size; i++) {
byte[] errorEvt = redisTemplate.opsForList().rightPop(errorQueue);
redisTemplate.opsForList().leftPush(destinationQueue, errorEvt);
}
}