Ошибка нераспознанного байта заголовка при попытке декодирования сообщения Avro в Spring Cloud Stream

Я пытаюсь написать тестовый пример для своего приложения Spring Cloud Stream. Я использую Confluent Schema Registry с Avro, поэтому мне нужно декодировать сообщение после опроса с канала. Вот мой код:

    processor.input()
        .send(MessageBuilder.withPayload(InputData).build());

    Message<?> message = messageCollector.forChannel(processor.output()).poll();

    BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
    OutputData outputObject = decoder.decode((byte[]) message.getPayload());

По какой-то причине этот код бросает

org.apache.avro.message.BadHeaderException: нераспознанные байты заголовка: 0x00 0x08

Я не уверен, что это какая-то ошибка, с которой я столкнулся, или я не использую правильный способ декодирования полученного сообщения avro. Я подозреваю, что мне нужно установить заголовок с чем-то, но я не совсем уверен, как и с чем именно. Буду признателен, если кто-нибудь поможет мне в этом вопросе.

P.S: Я использую spring-cloud-stream-test-support для этого теста.


person Ali    schedule 25.07.2019    source источник


Ответы (2)


При использовании тестового связывателя данные не будут кодироваться в avro.

Испытательное связующее очень ограничено.

Чтобы правильно протестировать сквозное соединение с помощью avro, вы должны удалить тестовую привязку и использовать настоящую привязку kafka со встроенным брокером kafka.

Один из примеров apps показывает, как это сделать.

person Gary Russell    schedule 25.07.2019
comment
Когда я взглянул на массив байтов, он выглядел как правильное сообщение Avro. Однако автоматически сгенерированный байтовый декодер не может его декодировать. Я подозреваю, что его нельзя использовать даже для этого. Я попробовал использовать официальный декодер Avro (например, DatumReader и DecoderFactory), и он работает. Вы знаете, почему он не работал с подходом, поднятым в вопросе? - person Ali; 26.07.2019

Оказывается, проблема была связана с тем, как я пытался расшифровать сообщение Avro. При использовании официальных библиотек Avro у меня сработал следующий код:

Decoder decoder = DecoderFactory.get().binaryDecoder((byte[]) message.getPayload(), null);
DatumReader<OutputData> reader = new SpecificDatumReader<>(OutputData.getClassSchema());

RawDataCapsule rawDataCapsule = reader.read(null , decoder);
person Ali    schedule 27.07.2019