Создайте собственного производителя / потребителя, чтобы обеспечить работоспособность кластера

Если вы, как и я, работаете DevOps, вам иногда нужно развертывать новые версии промежуточного программного обеспечения (возможно, в целях безопасности или для использования некоторых новых функций и т. Д.). Но что будет, если эти новые версии не будут работать с вашими предыдущими конфигами?

Основываясь на этом наблюдении, мы реализовали несколько автоматических тестов, которые запускаются каждый раз при создании новой версии промежуточного программного обеспечения.

Вот один, который я использую с Apache Kafka. Кафка обидчивый и не такой простой в управлении. Как только вы получите стабильную версию, готовую к выпуску, вы дважды подумаете, прежде чем обновлять ее. В образе мышления DevOps замечательно то, что его можно применять во многих случаях использования: доставка кода, доставка инфраструктуры, установка исправлений безопасности, соответствие требованиям и т. Д.

В качестве напоминания и для распространения цели кластера Kafka допустим, что кластер Kafka представляет собой поток огромного количества данных в реальном времени. Данные хранятся в так называемых темах и потребляются потребителями.

Сразу после процесса сборки, чтобы протестировать кластер Kafka, может быть здорово создать собственную систему производитель / потребитель, чтобы проверить, работает ли ваш кластер или нет.

Для этого мы будем использовать Python (как обычно) и библиотеку Kafka-Python (pip-install). Одним из предварительных условий для этого сценария является то, что ваш кластер Kafka должен позволять вам динамически создавать темы (т. Е. Для параметра auto.create.topics.enable должно быть установлено значение true, что является значением по умолчанию в вашем Kafka server.properties.)

Что мы здесь создадим:

  • Производитель, который вставит в тему числовое значение (оно может быть фиксированным или случайным; решать вам).
  • Потребитель, который попытается использовать все вставленные сообщения
  • Генератор отчетов, который загрузит в S3 (при условии, что ваш кластер находится в AWS) текстовый файл с результатами теста.

NB: мой скрипт запускается через конвейер GitLab, и я использую функцию sys.exit, чтобы вернуть код выхода в свою ОС. Это позволяет мне иметь успешную или неудачную работу в моем конвейере.

Сначала загружаем все зависимости: Kafka, datetime, Boto3 (AWS SDK), OS и sys.

Затем мы объявляем некоторые базовые настройки, которые будут использоваться во всей программе:

  • client - URL-адрес конечной точки вашего кластера Kafka (здесь у меня три узла).
  • topic - название создаваемой темы, в которую будут вводиться и использоваться данные вашего теста.
  • nbrrecords - сколько записей закачивать
  • nbrrecordsinserted - начать с 0
  • nbrrecordsretreived — инициализация с 0
  • now - текущая отметка времени
  • S3client - чтобы загрузить наш результат в корзину S3

Первой и уникальной функцией здесь будет создание нашего отчета. Он распечатает в файле количество записей, которые нужно вставить, вставить и использовать. Просто, просто.

Здесь мы начнем с производителя. Вам необходимо инициировать объект KafkaProducer со строкой подключения к кластеру в качестве параметра.

Затем мы вставляем в наш кластер сообщение «str (i) + Is my cluster working» для каждого вставленного элемента, где i - это переменная, начинающаяся с 1 до вашего количества записей для вставки.

Если во время этого процесса что-то пойдет не так, мы перехватим исключение, создадим отчет об ошибке и загрузим его в S3.

Первая часть прошла быстро и эффективно. Теперь вы должны были вставить некоторые данные в свой кластер. Пора читать (использовать) их сейчас с помощью объекта KafkaConsumer.

Первая часть очень похожа на производителя. Мы инициализируем объект-потребитель и назначаем ему раздел темы для синтаксического анализа и строку подключения нашего кластера Kafka.

Мы также просим потребителя начать с самого начала и собираем последнее значение, хранящееся в теме. Это будет использоваться для выхода из нашего потребительского цикла for. (Если вы этого не сделаете, ваш потребитель будет бесконечно ждать новых сообщений - не совсем то, что нам здесь нужно).

В течение всего процесса чтения мы увеличиваем нашу переменную, чтобы узнать, сколько сообщений было прочитано. Одним из улучшений здесь будет реализация блока try / except для обработки сбоя потребления сообщения во время цикла for.

Забавно, что иногда ты видишь собственные ошибки, просто написав статью (/ facepalm).

После блока try / except мы загружаем наш окончательный результат в S3.

if nbrrecordsinserted == nbrrecordsretreived:
  sys.exit(0)
else:
  sys.exit(1)

И наконец, чтобы корректно выйти из скрипта, мы проверяем, все ли сообщения были обработаны нашим процессом. Если да, мы говорим, что сценарий прошел успешно, а если нет, мы считаем его нокаутом.

Вся суть доступна, если вы хотите использовать код.

Пожалуйста, не стесняйтесь вносить любые предложения в комментариях ниже; это было бы очень признательно.

Надеюсь, вам понравилось это краткое руководство. Увидимся в следующей статье.