Использование Kafka в 2022 году

Статья здесь: https://flatteredwithflutter.com/using-kafka/

Мы кратко расскажем:

  1. Введение в Кафку
  2. Настроить Кафку на машину
  3. Создать продюсера Кафки
  4. Создать потребителя Кафки
  5. Работающий производитель и потребитель

Примечание. В этой статье предполагается, что читатель знаком с Java и настроил его на компьютере. Также пользователь знаком с терминологией Кафки.

Введение в Кафку

Согласно документам, Kafka — это платформа потоковой передачи событий, которую можно использовать для сбора, обработки, хранения и анализа данных в любом масштабе. Kafka используется тысячами компаний, в том числе более 60% из списка Fortune 100.

Kafka известна своей отличной производительностью, малой задержкой, отказоустойчивостью и высокой пропускной способностью, она способна обрабатывать тысячи сообщений в секунду.

Случаи использования:

  • Обработка платежей и финансовых транзакций в режиме реального времени,
  • Непрерывно собирайте и анализируйте данные датчиков с устройств IoT,
  • Собирайте и немедленно реагируйте на взаимодействие с клиентами и заказы,
  • Отслеживайте и контролируйте автомобили, грузовики, автопарки и грузы в режиме реального времени

Настройте Kafka на компьютере

Этот раздел поможет настроить Kafka на вашем компьютере.

Примечание. Мы используем Mac.

  • Для установки Kafka следуйте здесь. Перейдите к последним загруженным двоичным файлам (на сегодняшний день это Scala 2.13) и нажмите на ссылку. Откроется другая веб-страница и щелкните ссылку, как показано ниже:

  • Извлеките папку и перейдите в папку (Downloads/kafka_2.12–3.1.0) в нашем случае.

Примечание. Следуйте приведенным ниже командам терминала по пути, по которому вы распаковываете zip-архив Kafka.

Запустить Zookeeper

  • Откройте терминал по указанному выше пути и введите
sh bin/zookeeper-server-start.sh config/zookeeper.properties

Приведенная выше команда запускает Zookeeper . В основном он используется для отслеживания состояния узлов в кластере Kafka и ведения списка тем и сообщений Kafka.

Примечание. Начиная с версии 2.8, Kafka можно запускать без Zookeeper. Однако это обновление не готово к использованию в рабочей среде.

Держите вышеуказанный терминал открытым

Запустить брокерскую службу Kafka

  • Откройте другой терминал по указанному выше пути (Загрузки/kafka_2.12–3.1.0) и введите
sh bin/kafka-server-start.sh config/server.properties

Это запускает Kafka Broker. Кластер Kafka состоит из нескольких брокеров Kafka. Каждый брокер Kafka имеет уникальный идентификатор (номер). Kafka Brokers содержат разделы журналов тем. При подключении к одному брокеру клиент загружается во весь кластер Kafka. Кластер Kafka может иметь 10, 100 или 1000 брокеров в кластере, если это необходимо.

Держите этот терминал открытым

Создать тему

  • Откройте новый терминал по указанному выше пути и введите
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic

Kafka создает события на разных машинах, эти события организованы и хранятся внутри topics

В приведенной выше команде мы говорим, что наш

  • сервер localhost:9092 и
  • тема создана test-topic

Проверьте созданную тему

  • Откройте новый терминал по указанному выше пути и введите
sh bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Это должно показать вам файл test-topic.

Создать продюсера Кафки

Примечание. Kafka Broker должен быть запущен в терминале.

Установить зависимости

Мы будем использовать Eclipse IDE для создания нашего Kafka Producer.

  • Создайте проект Maven, используя eclipse.
  • Перейдите к pom.xml, введите следующее и обновите свой проект maven.
<dependencies> 
   
   <!-- Kafka Client -->
   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
   </dependency>
   
   <!-- Simple logging -->
   <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.0-alpha6</version>
   </dependency>
   <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.0-alpha6</version>     
   </dependency>
</dependencies>

Это добавит Kafka Client и slf4j в наш проект.

Создайте SampleProducer.java

  • Создайте класс под названием SampleProducer.java
public class SampleProducer {
 
    
 public SampleProducer() throws InterruptedException {
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "localhost:9092");
  properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
 
  properties.put("kafka.topic", "test-topic");
  
  KafkaProducer kafkaProducer = new KafkaProducer(properties);
  int i=0;
  
  try {
   while (true) {
    i++;
    ProducerRecord producerRecord = new ProducerRecord("test-topic","key","message-" +i);
    
    Thread.sleep(3000);
    kafkaProducer.send(producerRecord);
   }
  } finally {
   kafkaProducer.close();
  }    
    }
                  
}

Мы инициализируем Properties следующими ключами и значениями

bootstrap.servers : хост и порт вашего локального сервера

key.serializer : для отправки сериализованного ключа по сети.

value.serializer : для отправки сериализованного значения по сети.

kafka.topic : ваша тема о Кафке

  • Далее мы создаем экземпляр KafkaProducer со свойствами (как указано выше)
  • Нам нужно создать ProducerRecord для отправки данных производителю Kafka.

Этот ProducerRecord принимает название темы, ключ и значение для отправки.

  • Мы отправляем запись с помощью kafkaProducer.
  • Этот блок кода помещается внутрь цикла while и через каждые 3 секунды отправляет запись брокеру Kafka.

Создайте KafkaProducerRunner.java

  • Это будет класс, который вызывает вышеуказанный SampleProducer
public class KafkaProducerRunner {
 public static void main(String[] args) {
  SampleProducer sampleProducer = new SampleProducer();
 }
}

Создать потребителя Кафки

Примечание. Kafka Broker должен быть запущен в терминале.

  • Создайте класс под названием SampleConsumer.java
public class SampleConsumer {
 
 public SampleConsumer() {
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "localhost:9092"); 
  properties.put("kafka.topic", "test-topic"); 
  properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
 properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  
 properties.put("group.id", "my-group");
  
  KafkaConsumer consumer = new KafkaConsumer(properties);
  consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic")));
        
       while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord record : records)  {
           System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", 
             record.partition(), record.offset(), record.key(), record.value());
          }
       }              
  
 }  
}

Мы инициализируем Properties следующими ключами и значениями

bootstrap.servers : хост и порт вашего локального сервера

kafka.topic : твоя тема о Кафке

key.deserializer : для десериализации ключа по сети.

value.deserializer : для десериализации значения по сети.

group.id : укажите идентификатор группы, используемый только на стороне потребителя.

  • Далее мы создаем экземпляр KafkaConsumer со свойствами (как указано выше). Этот KafkaConsumer использует записи из кластера Kafka.
  • Далее подписываемся на топик (который был создан в KafkaProducer) с помощью subscribe
  • Мы получаем данные в виде ConsumerRecords и вызываем poll каждые 100 мс в бесконечном цикле.
  • Для каждой потребительской записи мы извлекаем раздел, смещение, ключ и значение.

Создайте KafkaConsumerRunner.java

  • Это будет класс, который вызывает вышеуказанный SampleConsumer
public class KafkaConsumerRunner {
 public static void main(String[] args) {
  SampleConsumer sampleConsumer = new SampleConsumer();
 }
}

Работающий производитель и потребитель

Если вы правильно выполнили вышеуказанные шаги, у вас должны быть следующие файлы

  • Давайте сначала запустим KafkaProducerRunner, мы должны увидеть что-то вроде этого

Наш производитель производит данные каждые 3 секунды.

  • Давайте запустим KafkaConsumerRunner, мы должны увидеть что-то вроде этого

Наш потребитель получает данные каждые 3 секунды, и мы выводим их на консоль.

Другие статьи:









Source code