Привет, ребята! В этой статье я собираюсь обсудить, как сериализовать объекты с помощью сериализатора avro и реестра схем.

Оглавление

  1. Настройте среду для Kafka (сервер Kafka, Zookeeper, Schema Registry) и Docker.
  2. Настроить среду программирования
  3. Создайте схему отправляемого объекта.
  4. Создайте производителя и отправьте в тему серийный объект Avro.
  5. Создайте клиента и используйте данные, опубликованные производителем.
  6. Создайте другого производителя и отправьте другой серийный объект Avro с другими атрибутами и проверьте, может ли клиент использовать обе записи одновременно.

1. Настройте среду для Kafka (сервер Kafka, Zookeeper, Schema Registry) и Docker.

Представьте, что вам поручили создать конвейер потока данных, свойства данных потока обновляются с течением времени. Такие обновления часто требуют от нас изменения всей кодовой базы. После внесения необходимых изменений в систему он больше не принимает старые данные системного потока.

Чтобы решить такие проблемы, мы можем использовать конфлюентный реестр схем для потоковой передачи данных в формате Avro. Этот метод позволяет пользователям использовать как текущие, так и новые потоки данных.

Поскольку Windows не поддерживает платформу Confluent, я использую Docker Container для запуска сервера Kafka, зоопарка и реестра схем. Другая важная причина использования Docker заключается в том, что этот проект можно легко выполнить в любой установленной операционной системе Docker.



Предварительное условие - установить докер



Убедитесь, что у вас есть поддерживаемая версия docker и docker-compose.

Обратите внимание, что все команды, используемые в этом проекте, выполняются поверх Docker.

docker --version
docker-compose --version

Во-первых, нам нужно создать файл docker-compose. Затем мы должны включить в него образы реестра Kafka, Zookeeper и schema. Образы Docker, используемые в этом проекте, представлены confluentinc.

confluentinc / cp-zookeeper: 5.3.1
confluentinc / cp-enterprise-kafka: 5.3.1
confluentinc / cp-schema-registry: 5.3.1

Я использовал версию 5.3.1, вы можете использовать последнюю версию к тому времени, когда вы будете читать эту статью.

Откройте командную строку, в которой находится ваш файл docker-compose.yml, и выполните следующую команду, которая создаст zookeeper, сервер Kafka и реестр схемы.

Обратите внимание, что имя файла должно быть «docker-compose», а расширение файла - «yml», в противном случае команда не будет работать.

docker-compose -f docker-compose.yml up -d

Чтобы проверить состояние контейнера Docker, выполните следующую команду. Эта команда предоставит вам все сведения о контейнере. Например, имя контейнера, текущее состояние и порты, с которыми нужно взаимодействовать.

docker-compose ps

Создание темы

Убедившись, что Kafka и Zookeeper в хорошем состоянии, мы можем создать тему для публикации данных. Я назвал тему «Тема-A» и установил количество разделов и коэффициент репликации равным 1.

Если вы последуете за файлом docker-compose, сценарий создаст кластер с одним узлом. Поскольку мы делаем этот проект с использованием кластера с одним узлом, коэффициент репликации для каждого раздела темы должен быть равен единице.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --create --topic Topic-A --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:2181

Перечислите тему

Эта команда выведет список всех существующих тем в кластере.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --list --zookeeper  localhost:2181

Опишите тему

Используя команду describe, мы получаем представление о том, какой брокер держит лидера и последователей. В нашем случае у нас есть только один лидер, у которого нет последователей.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --zookeeper localhost:2181  --describe --topic Topic-A

Если вы до сих пор следовали моему подходу. К настоящему времени у вас есть zookeeper, сервер Kafka и реестр схем, запущенный поверх docker. Также мы создали тему для потоковой передачи данных.

2. Настройте среду программирования.

Пришло время создать схему производителя, потребителя и avro. Для этого проекта вы можете использовать любой инструмент разработки, поддерживающий Java. Я использую идею IntelliJ.
Сначала создайте проект Maven и определите все необходимые зависимости проекта в файле «pom.xml».

3. Создайте схему отправляемого объекта.

В этом проекте я решил отправить объект сотрудника. Вот реестр схемы для объекта сотрудника.

Назовите его «employee-version1.avsc» и поместите в свой файл ресурсов.

Здесь нам не нужно создавать отдельные классы для сериализации и десериализации объектов, как мы делали в моей предыдущей статье.

Когда мы используем сторонний сериализатор, такой как Avro, мы заботимся о создании процесса класса сериализации.

Теперь давайте посмотрим, как создать класс сериализации с помощью Avro. Если вы используете идею IntelliJ, вы можете просто создать ее, выбрав вариант пакета, который доступен в жизненном цикле Maven.

Это приводит к созданию класса объекта на основе схемы ввода. Как только файл будет сгенерирован, вы получите сообщение «BUILD SUCCESS».

Этот автоматически сгенерированный файл находится в созданном вами исходном каталоге.

Если вы человек, который не полагается на инструменты разработки, вы можете использовать avro-tools-1.9.2.jar для компиляции схемы вручную.

Перейдите по следующей ссылке и загрузите файл avro-tools-1.9.2.jar.



После загрузки инструмента используйте эту команду для создания класса схемы.

java -jar <path/to/avro-tools-1.9.2.jar> compile schema <path/to/schema-file> <destination-folder>

4. Создайте продюсера и отправьте в тему серийный объект Avro.

Теперь давайте создадим производителя для публикации наших пользовательских объектов.

Создайте класс производителя следующим образом и переименуйте его в KafkaAvroProducerVersion1. Я назвал этот класс версией 1, потому что мы собираемся использовать класс производителя версии 2 для отправки различных атрибутов объекта в одну и ту же тему.

Если вы посмотрите на код, вы увидите, что «Value.serializer» - это «KafkaAvroSerializer». Более того, я определяю сотрудника как тип данных KafkaProducer.

5. Создайте клиента и используйте данные, опубликованные производителем.

После того, как производитель отправит сообщение в тему Kafka. Нам нужно создать потребителя, который будет потреблять опубликованные сообщения. Вот код для созданного мной покупателя.

Запустим продюсер и опубликуем данные в созданную нами тему.

Если данные успешно опубликованы в теме kafka, мы получаем ответное сообщение от kafka. Ответное сообщение включает в себя все детали того, как запись хранится в kafka.

Мы можем использовать потребителя для получения этой записи с другого конца.

6. Создайте другого производителя и отправьте другой серийный объект Avro с другими атрибутами и проверьте, может ли клиент использовать обе записи одновременно.

Давайте создадим схему для обновленной версии объекта сотрудника. Я обновляю объект сотрудника, удаляя employee_address и добавляя три свойства - employee_blood_group, employee_email, employee_phone.

Вот новый класс производителя, из которого мы собираемся создать наш обновленный объект сотрудника.

После того, как мы создадим второго производителя, мы можем сразу отправить наши сериализованные данные Avro в ту же тему.

Убедившись, что данные были отправлены в тему, мы можем увидеть, использовал ли потребитель как текущие, так и новые данные.

Вы заметите, что в этом потребителе отсутствуют некоторые поля, потому что этот потребитель официально предназначен для использования данных из старого объекта сотрудника.

Чтобы получить последние данные объекта сотрудника, мы можем использовать тот же код класса потребителя, обязательно проанализируйте новый класс сотрудника на kafkaConsumer.

Надеюсь, вы получите некоторые знания, следуя моему подходу. Я с нетерпением жду встречи с вами в другой статье.

Исходный код этого проекта будет доступен в этом репозитории.