Использование KafkaAvroDeserializer с Alpakka

У меня есть SchemaRegistry и KafkaBroker, из которых я извлекаю данные с помощью Avro v1.8.1. Для десериализации я использовал Confluent KafkaAvroDeserializer. Теперь я хотел провести рефакторинг своего кода, чтобы использовать Elasticsearch API предоставляется Alpakka, но, к сожалению, это нарушает десериализацию, поскольку приводит к исключениям NullPointerExceptions:

Исключение в потоке «main» org.apache.kafka.common.errors.SerializationException: Ошибка десериализации ключа / значения для раздела тема-0 со смещением 0. При необходимости выполните поиск мимо записи, чтобы продолжить потребление. Вызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения Avro для идентификатора 2 Вызвано: java.lang.NullPointerException в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer. io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:88) в io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize (KafkaAvroDeserializer.order. десериализовать (Deserializer.java:58) в org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord (Fetcher.java:1030) в org.apache.kafka.clients.consumer.internals.Fetcher.access $ 3300 (Fetcher .java: 110) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.fetchRecords (Fetcher.java:1250) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.access $ 1400 ( Fetcher.java:1099) на org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords (Fetcher.java:545) на org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords (Fetcher.java:506) на org.apache .kafka.clients.consumer.KafkaConsumer.pollForFetches (KafkaConsumer.java:1269) на org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1200) на orgmer.apache.consumer.kafka.kafka. .poll (KafkaConsumer.java:1176) по адресу de.adesso.fds.connectors.dpa.news.NewsConsumer.main (MyConsumer.java:58)

Я использовал ConsumerSettings API от Alpakka, как описано в этом пример:

val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

Эти настройки приводят к исключениям NullPointerExceptions, в то время как эти обычные реквизиты Kafka Consumer работают нормально:

val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

В рабочем примере значения ConsumerRecords успешно десериализованы в классы, созданные AvroMavenPlugin из схемы.

Любые подсказки приветствуются!


person styps    schedule 31.05.2019    source источник


Ответы (1)


Я думаю, вам нужно перетащить new KafkaAvroDeserializer() в его собственную переменную, а затем вызвать метод .configure() в этом экземпляре, чтобы передать ненулевой URL-адрес реестра.

Затем передайте настроенный экземпляр ConsumerSettings.create

FWIW, в зависимости от ваших потребностей, Kafka Connect отлично работает для загрузки Elasticsearch

person OneCricketeer    schedule 04.06.2019
comment
Кажется, вы правы: doc.akka.io/docs/alpakka- kafka / current / serialization.html - person styps; 04.06.2019
comment
Причина, по которой я предпочел бы Alpakka Kafka Connect, заключается в том, чтобы просто избежать использования конфлюэнтного концентратора. - person styps; 04.06.2019
comment
Мой плохой, не знал, что у конфлюэнта есть репозиторий maven (почему они не публикуются на центральном сервере, остается загадкой): packages.confluent.io/maven - person styps; 04.06.2019
comment
Потому что у них есть внутренние частные пакеты, которые помещаются в одно и то же репо, над которым у них есть полный контроль. Google также поддерживает отдельный репозиторий Maven для разработки под Android ... Тем не менее, я не видел, чтобы так много людей жаловалось на это. - person OneCricketeer; 04.06.2019
comment
И вам не нужно использовать Confluent Hub. Запуск образов Connect Docker из Confluent - одно из решений, и коннектор Elasticsearch уже включен в него ... Кроме того, я лично считаю Confluent Hub более простым, чем добавление коннекторов Kafka вручную. - person OneCricketeer; 04.06.2019