Невозможно использовать записи Kafka Avro с помощью Nifi и Schema Registry

Я пытаюсь использовать записи Avro из Kafka с помощью Nifi. У меня есть 3 темы, заполненные из заданий Amazon Lambda и 2 Spark Streaming, все из которых используют реестр схем HortonWorks для получения схемы Avro.

Я попытался использовать ConsumeKafkaRecord_0_10 и ConsumeKafkaRecord_2_0 и получил ту же ошибку:  Ошибка записи Kafka

Я попытался использовать AvroReader, используя внутри текстовую схему, чтобы убедиться, что она используется, и получил ту же ошибку. Когда я использую AvroReader с параметром реестра схемы Horton, я получаю следующую ошибку: Ошибка схемы использования kafka horton

Что могло бы сделать разумным, потому что он рассматривает первый байт записи как параметр версии для схемы, а первый байт равен 3. Но это не объясняет, почему я получаю ArrayIndexOutOfBound при помещении схемы в обычный текст.

Наконец, я могу отлично изучить эту тему, используя Spark Streaming и Schema Registry. Никто уже не сталкивался с такой проблемой между NiFi и AvroReader при использовании Kafka.

Стек: Horton Works HDP 3.4.1 // Nifi 1.9.0 // Spark 2.3 // Реестр схем 0.7


person Vincent    schedule 21.11.2019    source источник
comment
вы пробовали с nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/ процессор?   -  person Ishan Kumar    schedule 22.11.2019
comment
Да, используя этот процессор, мне все равно нужно десериализовать сообщение avro позже и получить ту же ошибку. Я думаю, это как-то связано с сериализатором / десериализатором Avro, который использует Nifi.   -  person Vincent    schedule 22.11.2019


Ответы (1)


Проблема связана с тем, как Nifi интерпретирует первые байты вашего сообщения Avro. Эти байты содержат информацию о:

  • Идентификатор протокола - 1 байт
  • Идентификатор метаданных схемы - 8 байт
  • Версия схемы - 4 байта

Просматривая код HortonWork Schema Registry, мы можем обнаружить, что для сериализации вашего сообщения с помощью AvroSerDe можно использовать другой идентификатор протокола.

public static final byte CONFLUENT_VERSION_PROTOCOL = 0x0;
public static final byte METADATA_ID_VERSION_PROTOCOL = 0x1;
public static final byte VERSION_ID_AS_LONG_PROTOCOL = 0x2;
public static final byte VERSION_ID_AS_INT_PROTOCOL = 0x3;
public static final byte CURRENT_PROTOCOL = VERSION_ID_AS_INT_PROTOCOL;

Источник

По умолчанию используется VERSION_ID_AS_INT_PROTOCOL, что означает, что первым байтом сообщений Avro будет 03.

Проходя через код Nifi, мы видим, что он фактически использует только METADATA_ID_VERSION_PROTOCOL, ожидая 01 и не принимая во внимание что-либо еще.

Вы должны заставить Spark использовать METADATA_ID_VERSION_PROTOCOL при создании SchemaRegistryConfig.

  val config = Map[String, Object](
    "schema.registry.url" -> ConfigManager.config.getProperty("schemaregistry.default.url"),
    AbstractAvroSnapshotSerializer.SERDES_PROTOCOL_VERSION -> SerDesProtocolHandlerRegistry.METADATA_ID_VERSION_PROTOCOL.asInstanceOf[Object]
  )
  implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
person Vincent    schedule 03.12.2019