Создайте потребителя Python Kafka с pykafka и flask
Поскольку мы научились передавать данные в Kafka, пришло время написать потребителя Kafka с помощью Python. Мы снова будем использовать клиент pykafka и снова начнем с базового объяснения потребителя pykafka.
Первые две строчки должны показаться вам знакомыми. После импорта KafkaClient (строка 1) мы указываем местоположение нашего брокера Kafka и назначаем его переменной клиента (строка 3).
В конце концов, мы можем активизировать нашего потребителя с помощью get_simple_consumer (), который работает только с темой Kafka. Кроме того, поскольку тема Kafka обычно содержит много данных, мы перебираем все сообщения в этой теме с помощью цикла (строка 4). Наконец, мы распечатываем сообщения одно за другим (строка 5). Здесь важно → мы должны декодировать сообщения Kafka, поскольку Kafka хранит все сообщения в байтовом формате.
Теперь, когда у нас есть базовое представление о потребителе pykafka, давайте обернем API-интерфейс Flask, который мы можем вызывать из браузера.
После импорта Flask (строка 1) мы запускаем новое приложение Flask (строка 2) и запускаем его в конце блока кода (строки 13 + 14). Между тем мы настраиваем маршрут, который может быть вызван браузером по адресу localhost: 5001 / topic / topicname позже (строка 5). Для каждого маршрута требуется функция, поэтому мы создаем get_messages и пересылаем имя темы из URL-адреса (строка 6).
Теперь это становится немного сложнее :-) Мы создаем новую функцию events, которая потребляет сообщения Kafka и генерирует / выдает их (строки 8–10). Важно: мы используем оператор yield вместо return, потому что возврат завершит функцию после использования первого сообщения Kafka. Это концепция генераторов Python.
Наконец, мы возвращаем потребленные события из функции get_messages как поток событий (строка 11).
Вы можете найти полный код на GitHub. Если вам нравится этот сериал, подписывайтесь на меня на Youtube. В серии видео даны еще несколько более глубоких объяснений.
Я разделю эту историю на серию из 4 историй, как показано в приведенном ниже обзоре. Это позволяет мне более подробно остановиться на каждой конкретной части общего решения. Поскольку это мой первый средний пост, я буду очень признателен за любые отзывы.
- Часть 1 - Введение - Что мы будем строить?
- Часть 2 - Продюсер Kafka на Python
- Часть 3 - Потребитель Kafka в Python (эта история)
- Часть 4 - Фронтенд с Leaflet JS
Развлекайся :-)