Конечно, вы можете сделать это без каких-либо инструментов Confluent. Но вы должны проделать дополнительную работу на своей стороне (например, в коде вашего приложения) - что было первоначальной мотивацией предоставления инструментов, связанных с Avro, таких как те, что от Confluent, о которых вы упомянули.
Один из вариантов - вручную сериализовать / десериализовать полезную нагрузку ваших сообщений Kafka (например, с YourJavaPojo
на byte[]
) с помощью Apache Avro Java API напрямую. (Я полагаю, что вы подразумевали Java в качестве предпочтительного языка программирования.) Как это будет выглядеть? Вот пример.
- Во-первых, вы должны вручную сериализовать полезные данные в своем приложении, которое записывает данные в Kafka. Здесь вы можете использовать API сериализации Avro для кодирования полезной нагрузки (от Java pojo до
byte[]
), а затем использовать клиент производителя Java Kafka для записи закодированной полезной нагрузки в тему Kafka.
- Затем, ниже по потоку в конвейере данных, вы должны выполнить десериализацию в другом приложении, которое считывает данные из Kafka. Здесь вы можете использовать клиентский клиент Java Kafka для чтения (закодированных) данных из той же темы Kafka и использовать API десериализации Avro для повторного декодирования полезной нагрузки (с
byte[]
на Java pojo).
Вы также можете использовать Avro API напрямую, конечно, при работе с такими инструментами потоковой обработки, как Kafka Streams (будет включен в будущую версию Apache Kafka 0.10) или Apache Storm.
Наконец, у вас также есть возможность использовать некоторые служебные библиотеки (будь то из Confluent или из других источников), чтобы вам не приходилось напрямую использовать Apache Avro API. Я опубликовал несколько более сложных примеров на странице kafka-storm-starter. , например как продемонстрировано AvroDecoderBolt.scala. Здесь сериализация / десериализация Avro выполняется с помощью библиотеки Scala Twitter Bijection. Вот пример фрагмента AvroDecoderBolt.scala
, чтобы дать вам общее представление:
// This tells Bijection how to automagically deserialize a Java type `T`,
// given a byte array `byte[]`.
implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
// Let's put Bijection to use.
private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
require(bytes != null, "bytes must not be null")
val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here
decodeTry match {
case Success(pojo) =>
log.debug("Binary data decoded into pojo: " + pojo)
collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
()
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
}
Так что да, вы, конечно, можете отказаться от использования каких-либо дополнительных библиотек, таких как сериализаторы / десериализаторы Avro Confluent (в настоящее время доступны как часть confluentinc / schema-registry) или Биекция Twitter. Вам решать, стоит ли это дополнительных усилий.
person
Michael G. Noll
schedule
18.05.2016