Создайте собственного производителя / потребителя, чтобы обеспечить работоспособность кластера
Если вы, как и я, работаете DevOps, вам иногда нужно развертывать новые версии промежуточного программного обеспечения (возможно, в целях безопасности или для использования некоторых новых функций и т. Д.). Но что будет, если эти новые версии не будут работать с вашими предыдущими конфигами?
Основываясь на этом наблюдении, мы реализовали несколько автоматических тестов, которые запускаются каждый раз при создании новой версии промежуточного программного обеспечения.
Вот один, который я использую с Apache Kafka. Кафка обидчивый и не такой простой в управлении. Как только вы получите стабильную версию, готовую к выпуску, вы дважды подумаете, прежде чем обновлять ее. В образе мышления DevOps замечательно то, что его можно применять во многих случаях использования: доставка кода, доставка инфраструктуры, установка исправлений безопасности, соответствие требованиям и т. Д.
В качестве напоминания и для распространения цели кластера Kafka допустим, что кластер Kafka представляет собой поток огромного количества данных в реальном времени. Данные хранятся в так называемых темах и потребляются потребителями.
Сразу после процесса сборки, чтобы протестировать кластер Kafka, может быть здорово создать собственную систему производитель / потребитель, чтобы проверить, работает ли ваш кластер или нет.
Для этого мы будем использовать Python (как обычно) и библиотеку Kafka-Python (pip-install). Одним из предварительных условий для этого сценария является то, что ваш кластер Kafka должен позволять вам динамически создавать темы (т. Е. Для параметра auto.create.topics.enable
должно быть установлено значение true
, что является значением по умолчанию в вашем Kafka server.properties
.)
Что мы здесь создадим:
- Производитель, который вставит в тему числовое значение (оно может быть фиксированным или случайным; решать вам).
- Потребитель, который попытается использовать все вставленные сообщения
- Генератор отчетов, который загрузит в S3 (при условии, что ваш кластер находится в AWS) текстовый файл с результатами теста.
NB: мой скрипт запускается через конвейер GitLab, и я использую функцию sys.exit
, чтобы вернуть код выхода в свою ОС. Это позволяет мне иметь успешную или неудачную работу в моем конвейере.
Сначала загружаем все зависимости: Kafka, datetime
, Boto3 (AWS SDK), OS и sys.
Затем мы объявляем некоторые базовые настройки, которые будут использоваться во всей программе:
client
- URL-адрес конечной точки вашего кластера Kafka (здесь у меня три узла).topic
- название создаваемой темы, в которую будут вводиться и использоваться данные вашего теста.nbrrecords
- сколько записей закачиватьnbrrecordsinserted
- начать с 0nbrrecordsretreived
— инициализация с 0now
- текущая отметка времениS3client
- чтобы загрузить наш результат в корзину S3
Первой и уникальной функцией здесь будет создание нашего отчета. Он распечатает в файле количество записей, которые нужно вставить, вставить и использовать. Просто, просто.
Здесь мы начнем с производителя. Вам необходимо инициировать объект KafkaProducer со строкой подключения к кластеру в качестве параметра.
Затем мы вставляем в наш кластер сообщение «str (i) + Is my cluster working» для каждого вставленного элемента, где i - это переменная, начинающаяся с 1 до вашего количества записей для вставки.
Если во время этого процесса что-то пойдет не так, мы перехватим исключение, создадим отчет об ошибке и загрузим его в S3.
Первая часть прошла быстро и эффективно. Теперь вы должны были вставить некоторые данные в свой кластер. Пора читать (использовать) их сейчас с помощью объекта KafkaConsumer.
Первая часть очень похожа на производителя. Мы инициализируем объект-потребитель и назначаем ему раздел темы для синтаксического анализа и строку подключения нашего кластера Kafka.
Мы также просим потребителя начать с самого начала и собираем последнее значение, хранящееся в теме. Это будет использоваться для выхода из нашего потребительского цикла for
. (Если вы этого не сделаете, ваш потребитель будет бесконечно ждать новых сообщений - не совсем то, что нам здесь нужно).
В течение всего процесса чтения мы увеличиваем нашу переменную, чтобы узнать, сколько сообщений было прочитано. Одним из улучшений здесь будет реализация блока try / except для обработки сбоя потребления сообщения во время цикла for
.
Забавно, что иногда ты видишь собственные ошибки, просто написав статью (/ facepalm).
После блока try / except мы загружаем наш окончательный результат в S3.
if nbrrecordsinserted == nbrrecordsretreived: sys.exit(0) else: sys.exit(1)
И наконец, чтобы корректно выйти из скрипта, мы проверяем, все ли сообщения были обработаны нашим процессом. Если да, мы говорим, что сценарий прошел успешно, а если нет, мы считаем его нокаутом.
Вся суть доступна, если вы хотите использовать код.
Пожалуйста, не стесняйтесь вносить любые предложения в комментариях ниже; это было бы очень признательно.
Надеюсь, вам понравилось это краткое руководство. Увидимся в следующей статье.