Использование Kafka в 2022 году
Статья здесь: https://flatteredwithflutter.com/using-kafka/
Мы кратко расскажем:
- Введение в Кафку
- Настроить Кафку на машину
- Создать продюсера Кафки
- Создать потребителя Кафки
- Работающий производитель и потребитель
Примечание. В этой статье предполагается, что читатель знаком с Java и настроил его на компьютере. Также пользователь знаком с терминологией Кафки.
Введение в Кафку
Согласно документам, Kafka — это платформа потоковой передачи событий, которую можно использовать для сбора, обработки, хранения и анализа данных в любом масштабе. Kafka используется тысячами компаний, в том числе более 60% из списка Fortune 100.
Kafka известна своей отличной производительностью, малой задержкой, отказоустойчивостью и высокой пропускной способностью, она способна обрабатывать тысячи сообщений в секунду.
Случаи использования:
- Обработка платежей и финансовых транзакций в режиме реального времени,
- Непрерывно собирайте и анализируйте данные датчиков с устройств IoT,
- Собирайте и немедленно реагируйте на взаимодействие с клиентами и заказы,
- Отслеживайте и контролируйте автомобили, грузовики, автопарки и грузы в режиме реального времени
Настройте Kafka на компьютере
Этот раздел поможет настроить Kafka на вашем компьютере.
Примечание. Мы используем Mac.
- Для установки
Kafka
следуйте здесь. Перейдите к последним загруженным двоичным файлам (на сегодняшний день это Scala 2.13) и нажмите на ссылку. Откроется другая веб-страница и щелкните ссылку, как показано ниже:
- Извлеките папку и перейдите в папку (Downloads/kafka_2.12–3.1.0) в нашем случае.
Примечание. Следуйте приведенным ниже командам терминала по пути, по которому вы распаковываете zip-архив Kafka.
Запустить Zookeeper
- Откройте терминал по указанному выше пути и введите
sh bin/zookeeper-server-start.sh config/zookeeper.properties
Приведенная выше команда запускает Zookeeper
. В основном он используется для отслеживания состояния узлов в кластере Kafka и ведения списка тем и сообщений Kafka.
Примечание. Начиная с версии 2.8, Kafka можно запускать без
Zookeeper
. Однако это обновление не готово к использованию в рабочей среде.
Держите вышеуказанный терминал открытым
Запустить брокерскую службу Kafka
- Откройте другой терминал по указанному выше пути (Загрузки/kafka_2.12–3.1.0) и введите
sh bin/kafka-server-start.sh config/server.properties
Это запускает Kafka Broker
. Кластер Kafka состоит из нескольких брокеров Kafka. Каждый брокер Kafka имеет уникальный идентификатор (номер). Kafka Brokers содержат разделы журналов тем. При подключении к одному брокеру клиент загружается во весь кластер Kafka. Кластер Kafka может иметь 10, 100 или 1000 брокеров в кластере, если это необходимо.
Держите этот терминал открытым
Создать тему
- Откройте новый терминал по указанному выше пути и введите
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
Kafka создает события на разных машинах, эти события организованы и хранятся внутри topics
В приведенной выше команде мы говорим, что наш
- сервер
localhost:9092
и - тема создана
test-topic
Проверьте созданную тему
- Откройте новый терминал по указанному выше пути и введите
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Это должно показать вам файл test-topic
.
Создать продюсера Кафки
Примечание. Kafka Broker должен быть запущен в терминале.
Установить зависимости
Мы будем использовать Eclipse IDE для создания нашего Kafka Producer.
- Создайте проект Maven, используя eclipse.
- Перейдите к pom.xml, введите следующее и обновите свой проект maven.
<dependencies> <!-- Kafka Client --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.1.0</version> </dependency> <!-- Simple logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.0-alpha6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.0-alpha6</version> </dependency> </dependencies>
Это добавит Kafka Client
и slf4j
в наш проект.
Создайте SampleProducer.java
- Создайте класс под названием
SampleProducer.java
public class SampleProducer { public SampleProducer() throws InterruptedException { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("kafka.topic", "test-topic"); KafkaProducer kafkaProducer = new KafkaProducer(properties); int i=0; try { while (true) { i++; ProducerRecord producerRecord = new ProducerRecord("test-topic","key","message-" +i); Thread.sleep(3000); kafkaProducer.send(producerRecord); } } finally { kafkaProducer.close(); } } }
Мы инициализируем Properties
следующими ключами и значениями
bootstrap.servers
: хост и порт вашего локального сервера
key.serializer
: для отправки сериализованного ключа по сети.
value.serializer
: для отправки сериализованного значения по сети.
kafka.topic
: ваша тема о Кафке
- Далее мы создаем экземпляр
KafkaProducer
со свойствами (как указано выше) - Нам нужно создать
ProducerRecord
для отправки данных производителю Kafka.
Этот ProducerRecord принимает название темы, ключ и значение для отправки.
- Мы отправляем запись с помощью kafkaProducer.
- Этот блок кода помещается внутрь цикла while и через каждые 3 секунды отправляет запись брокеру Kafka.
Создайте KafkaProducerRunner.java
- Это будет класс, который вызывает вышеуказанный
SampleProducer
public class KafkaProducerRunner { public static void main(String[] args) { SampleProducer sampleProducer = new SampleProducer(); } }
Создать потребителя Кафки
Примечание. Kafka Broker должен быть запущен в терминале.
- Создайте класс под названием
SampleConsumer.java
public class SampleConsumer { public SampleConsumer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("kafka.topic", "test-topic"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "my-group"); KafkaConsumer consumer = new KafkaConsumer(properties); consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic"))); while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
Мы инициализируем Properties
следующими ключами и значениями
bootstrap.servers
: хост и порт вашего локального сервера
kafka.topic
: твоя тема о Кафке
key.deserializer
: для десериализации ключа по сети.
value.deserializer
: для десериализации значения по сети.
group.id
: укажите идентификатор группы, используемый только на стороне потребителя.
- Далее мы создаем экземпляр
KafkaConsumer
со свойствами (как указано выше). Этот KafkaConsumer использует записи из кластера Kafka. - Далее подписываемся на топик (который был создан в KafkaProducer) с помощью
subscribe
- Мы получаем данные в виде
ConsumerRecords
и вызываемpoll
каждые 100 мс в бесконечном цикле. - Для каждой потребительской записи мы извлекаем раздел, смещение, ключ и значение.
Создайте KafkaConsumerRunner.java
- Это будет класс, который вызывает вышеуказанный
SampleConsumer
public class KafkaConsumerRunner { public static void main(String[] args) { SampleConsumer sampleConsumer = new SampleConsumer(); } }
Работающий производитель и потребитель
Если вы правильно выполнили вышеуказанные шаги, у вас должны быть следующие файлы
- Давайте сначала запустим KafkaProducerRunner, мы должны увидеть что-то вроде этого
Наш производитель производит данные каждые 3 секунды.
- Давайте запустим KafkaConsumerRunner, мы должны увидеть что-то вроде этого
Наш потребитель получает данные каждые 3 секунды, и мы выводим их на консоль.
Другие статьи:
Source code