У меня есть SchemaRegistry и KafkaBroker, из которых я извлекаю данные с помощью Avro v1.8.1. Для десериализации я использовал Confluent KafkaAvroDeserializer. Теперь я хотел провести рефакторинг своего кода, чтобы использовать Elasticsearch API предоставляется Alpakka, но, к сожалению, это нарушает десериализацию, поскольку приводит к исключениям NullPointerExceptions:
Исключение в потоке «main» org.apache.kafka.common.errors.SerializationException: Ошибка десериализации ключа / значения для раздела тема-0 со смещением 0. При необходимости выполните поиск мимо записи, чтобы продолжить потребление. Вызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения Avro для идентификатора 2 Вызвано: java.lang.NullPointerException в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer. io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:88) в io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize (KafkaAvroDeserializer.order. десериализовать (Deserializer.java:58) в org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord (Fetcher.java:1030) в org.apache.kafka.clients.consumer.internals.Fetcher.access $ 3300 (Fetcher .java: 110) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.fetchRecords (Fetcher.java:1250) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.access $ 1400 ( Fetcher.java:1099) на org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords (Fetcher.java:545) на org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords (Fetcher.java:506) на org.apache .kafka.clients.consumer.KafkaConsumer.pollForFetches (KafkaConsumer.java:1269) на org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1200) на orgmer.apache.consumer.kafka.kafka. .poll (KafkaConsumer.java:1176) по адресу de.adesso.fds.connectors.dpa.news.NewsConsumer.main (MyConsumer.java:58)
Я использовал ConsumerSettings API от Alpakka, как описано в этом пример:
val system = ActorSystem.create();
// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
.withBootstrapServers(kafkaBootstrapServerUrl)
.withClientId(InetAddress.getLocalHost().getHostName())
.withGroupId("" + new Random().nextInt())
.withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withStopTimeout(Duration.ofSeconds(5));
Эти настройки приводят к исключениям NullPointerExceptions, в то время как эти обычные реквизиты Kafka Consumer работают нормально:
val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);
В рабочем примере значения ConsumerRecords успешно десериализованы в классы, созданные AvroMavenPlugin из схемы.
Любые подсказки приветствуются!