Предполагаемое знание

В этом сообщении в блоге предполагается, что вы хорошо понимаете следующее (ссылки рядом с каждым, если вам нужно лучше понять)

Если вы еще этого не сделали, то документация по всему вышеперечисленному великолепна и определенно стоит того, чтобы ее прочесть.

Вступление

Целью этого руководства является создание микросервиса, который использует сообщения Kafka в Python с использованием инфраструктуры микросервисов Nameko.

Начиная

Если вы еще этого не сделали, настоятельно рекомендуется пройти учебник Nameko по написанию расширений. Мы собираемся создать точку входа в Kafka!

Определение точки входа

Мы начинаем с создания базового класса Python, который наследуется от базового класса Entrypoint с аргументом topic. Entrypoint дает нам несколько действительно полезных функций и функций настройки, которые нам нужно переопределить, чтобы наш класс заработал.

Имеет смысл, что инициализация класса принимает следующую переменную:

  • topic: Тема в Kafka, на которую вы хотите подписаться для точки входа.

Каждый раз, когда вызывается точка входа, Nameko создает экземпляр этого класса, и мы можем определить, с какой темой связана эта конкретная точка входа. Это упрощает использование одной службы nameko для нескольких тем.

Входная точка

Https://nameko.readthedocs.io/en/stable/writing_extensions.html

Базовый класс Entrypoint в Nameko дает нам несколько функций, которые нам нужно определить, чтобы наш класс заработал.

настраивать

setup: вызывается на связанных расширениях (от которых точка входа наследуется) перед запуском контейнера. Здесь расширения должны выполнить любую необходимую инициализацию. (По сути, идеальное место для старта нашего потребителя Kafka)

Все, что мы здесь делаем, это используем библиотеку Kafka-Python для инициализации Kafka Consumer. Это подключается к Kafka и начинает обработку сообщений темы. Здесь вы можете настроить множество конфигураций, поэтому внимательно прочтите документацию: https://kafka-python.readthedocs.io/en/master/usage.html

Вероятно, будет разумным поместить IP-адрес bootstrap_servers в файл конфигурации Nameko и импортировать его с помощью nameko. Документы nameko для этого можно найти здесь: https://nameko.readthedocs.io/en/stable/built_in_dependency_providers.html

Начните

start: вызывается на связанных расширениях после успешного запуска контейнера. Это вызывается только после того, как все другие расширения успешно вернутся из Extension.setup(). Если расширение реагирует на внешние события, оно должно начать действовать в соответствии с ними.

Мы собираемся создать новый управляемый поток (который Nameko дает нам очень простой доступ к использованию self.container.spawn_managed_thread(...)). внутри start. Мы также определили, что называется новой функцией self.run, и мы определим, что именно она делает позже в этом руководстве.

Стоит отметить:

  • Управляемые потоки всегда завершаются при остановке или уничтожении контейнера.
  • Необработанные исключения в управляемых потоках перехватываются контейнером и вызывают его завершение с соответствующим сообщением, которое может предотвратить зависание процессов.

Nameko построен на основе библиотеки eventlet, которая обеспечивает параллелизм через зеленые потоки.

останавливаться

Вызывается, когда сервисный контейнер начинает отключаться. Здесь расширения должны корректно завершать работу.

Класс KafkaConsumer python предоставляет нам функцию .close(), которая безопасно завершает работу потребителя, если он находится в процессе чего-либо, и передает Kafka последний индекс сообщения, который он обрабатывал.

запустить

Эта функция не определяется точкой входа, но она сохраняет наш код в чистоте и позволяет нам использовать поток для обработки и получения сообщений.

Здесь мы определяем функцию запуска, которая просто перебирает потребителя и вызывает функцию self.handle_message с объектом сообщения. Чтобы получить данные темы из Kafka, которые были отправлены вместе с сообщением, вы можете вызвать message.value, а остальные атрибуты - это метаданные о клиенте и сервере Kafka.

В приведенном ниже примере вы можете видеть, что мы просто передаем сообщение в точку входа в качестве аргумента, и поэтому мы должны определить args и kwargs в функции handle_message и передать их обратно в spawn_worker, чтобы он знал, что вызывать функция точки входа с.

Если вы не знакомы со строковыми литералами в формате python 3.6, дополнительную информацию можно прочитать здесь: https://docs.python.org/3/whatsnew/3.6.html#whatsnew36-pep498

Законченный класс!

И теперь мы закончили. Вот готовый класс и пример того, как его использовать ниже