Создание и использование сообщений Avro из Kafka без компонентов Confluent

Я пытаюсь найти пример, в котором я могу создавать и подписывать сообщения avro от kafka.

На данный момент я хочу использовать «обычное» развертывание кафки без каких-либо сливающихся надстроек.

Это возможно? Все примеры, которые я нашел до сих пор, очень быстро начинают использовать сливающиеся специфические инструменты для сообщений avro.

Я уверен, что должен быть способ публиковать и использовать сообщения avro только на платформе kafka с любыми «специфичными для распространения» надстройками.


person Knows Not Much    schedule 18.05.2016    source источник
comment
Если я понимаю ваш вопрос, в Kafka нет встроенного способа создания и загрузки сообщений avro. По сути, вы должны использовать клиент avro, такой как fastavro, для сериализации в формат avro непосредственно перед созданием в Kafka и загружать его сразу после использования из темы.   -  person Tim Martin    schedule 18.05.2016
comment
Так как обернуть логику сериализации / десериализации avro в пользовательский сериализатор?   -  person Knows Not Much    schedule 18.05.2016
comment
Гм ... Более-менее, сериализуйте его в формат avro, прежде чем продвигать в тему. В зависимости от клиента, который вы используете, для этого могут быть ярлыки.   -  person Tim Martin    schedule 18.05.2016


Ответы (1)


Конечно, вы можете сделать это без каких-либо инструментов Confluent. Но вы должны проделать дополнительную работу на своей стороне (например, в коде вашего приложения) - что было первоначальной мотивацией предоставления инструментов, связанных с Avro, таких как те, что от Confluent, о которых вы упомянули.

Один из вариантов - вручную сериализовать / десериализовать полезную нагрузку ваших сообщений Kafka (например, с YourJavaPojo на byte[]) с помощью Apache Avro Java API напрямую. (Я полагаю, что вы подразумевали Java в качестве предпочтительного языка программирования.) Как это будет выглядеть? Вот пример.

  • Во-первых, вы должны вручную сериализовать полезные данные в своем приложении, которое записывает данные в Kafka. Здесь вы можете использовать API сериализации Avro для кодирования полезной нагрузки (от Java pojo до byte[]), а затем использовать клиент производителя Java Kafka для записи закодированной полезной нагрузки в тему Kafka.
  • Затем, ниже по потоку в конвейере данных, вы должны выполнить десериализацию в другом приложении, которое считывает данные из Kafka. Здесь вы можете использовать клиентский клиент Java Kafka для чтения (закодированных) данных из той же темы Kafka и использовать API десериализации Avro для повторного декодирования полезной нагрузки (с byte[] на Java pojo).

Вы также можете использовать Avro API напрямую, конечно, при работе с такими инструментами потоковой обработки, как Kafka Streams (будет включен в будущую версию Apache Kafka 0.10) или Apache Storm.

Наконец, у вас также есть возможность использовать некоторые служебные библиотеки (будь то из Confluent или из других источников), чтобы вам не приходилось напрямую использовать Apache Avro API. Я опубликовал несколько более сложных примеров на странице kafka-storm-starter. , например как продемонстрировано AvroDecoderBolt.scala. Здесь сериализация / десериализация Avro выполняется с помощью библиотеки Scala Twitter Bijection. Вот пример фрагмента AvroDecoderBolt.scala, чтобы дать вам общее представление:

  // This tells Bijection how to automagically deserialize a Java type `T`,
  // given a byte array `byte[]`.
  implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

  // Let's put Bijection to use.
  private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
    require(bytes != null, "bytes must not be null")
    val decodeTry = Injection.invert(bytes)  // <-- deserialization, using Twitter Bijection, happens here
    decodeTry match {
      case Success(pojo) =>
        log.debug("Binary data decoded into pojo: " + pojo)
        collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
        ()
      case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
    }
  }

Так что да, вы, конечно, можете отказаться от использования каких-либо дополнительных библиотек, таких как сериализаторы / десериализаторы Avro Confluent (в настоящее время доступны как часть confluentinc / schema-registry) или Биекция Twitter. Вам решать, стоит ли это дополнительных усилий.

person Michael G. Noll    schedule 18.05.2016