Это статья о том, как настроить Apache Pulsar, создавать сообщения и использовать их с помощью FastAPI.
В этой статье не будут показаны все функции Apache Pulsar или FastAPI.
Apache Pulsar и FastAPI — это две мощные технологии, которые в сочетании могут помочь вам создавать чрезвычайно быстрые и масштабируемые API-интерфейсы в реальном времени. Apache Pulsar — это распределенная платформа потоковой передачи с открытым исходным кодом, которая действует как посредник между производителями и потребителями потоков данных. FastAPI — это современная, быстрая (высокопроизводительная) веб-инфраструктура для создания API с помощью Python. В этой статье мы увидим, как интегрировать Apache Pulsar с FastAPI, чтобы использовать возможности потоковой передачи событий Pulsar для поддержки ваших приложений FastAPI.
Требования
- Среда выполнения Java (64-разрядная версия)
- Python установлен
- Пип установлен
- WSL установлен для пользователей Windows
Апачский пульсар
Pulsar — это многопользовательское высокопроизводительное решение для обмена сообщениями между серверами. Первоначально разработанный Yahoo, Pulsar находится под управлением Apache Software Foundation.
Ключевые особенности Pulsar перечислены ниже:
- Встроенная поддержка нескольких кластеров в экземпляре Pulsar с бесшовной георепликацией сообщений между кластерами.
- Очень низкая публикация и сквозная задержка.
- Плавная масштабируемость до более чем миллиона тем.
- Простой клиентский API с привязками для Java, Go, Python и C++.
- Несколько типов подписки (эксклюзивная, общая и отказоустойчивая) для тем.
- Гарантированная доставка сообщений с постоянным хранилищем сообщений, предоставляемым Apache BookKeeper. Облегченная бессерверная вычислительная среда Pulsar Functions предлагает возможность потоковой обработки данных.
- Платформа бессерверного соединителя Pulsar IO, построенная на Pulsar Functions, упрощает перемещение данных в Apache Pulsar и из него.
- Многоуровневое хранилище переносит данные из горячего/теплого хранилища в холодное/долгосрочное хранилище (например, S3 и GCS), когда данные устаревают.
Монтаж
Чтобы установить Apache Pulsar, переходим на эту страницу, скачиваем бинарный файл и разархивируем его.
Или выполните следующие команды:
wget https://archive.apache.org/dist/pulsar/pulsar-3.0.0/apache-pulsar-3.0.0-bin.tar.gz tar xvfz apache-pulsar-3.0.0-bin.tar.gz cd apache-pulsar-3.0.0 ls -1F
Пользователи Windows должны выполнять приведенные выше команды с помощью WSL.
Чтобы запустить кластер Pulsar, мы запускаем эту команду, чтобы запустить автономный кластер Pulsar:
bin/pulsar standalone
После того, как мы запустили наш кластер Pulsar, мы создаем два файла: производителя и потребителя.
Как гласит терминология Pulsar в документации, производитель — это процесс, который публикует сообщения в тему Pulsar. Процесс, который устанавливает подписку на тему Pulsar и обрабатывает сообщения, опубликованные в этой теме производителями.
Тема — это именованный канал, используемый для передачи сообщений, опубликованных производителями, потребителям, которые обрабатывают эти сообщения.
import pulsar client = pulsar.Client('pulsar://localhost:6650') producer = client.create_producer('my-topic') for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8')) client.close()
В этом коде мы импортируем клиентскую библиотеку Apache Pulsar. Затем мы подключаемся к брокеру Pulsar, работающему на локальном хосте: 6650. Мы создаем продюсера с темой my-topic. Затем мы создаем цикл For для отправки 10 сообщений в заданную нами тему. Наконец, мы закрываем соединение.
import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except Exception: # Message failed to be processed consumer.negative_acknowledge(msg) client.close()
В потребителе мы импортируем клиентскую библиотеку Apache Pulsar и создаем соединение с брокером Pulsar, работающим локально на порту 6650. Мы подписываемся на тему «my-topic» с именем подписки «my-subscription».
Мы создаем бесконечный цикл, который будет непрерывно получать и обрабатывать сообщения. В случае успеха он подтверждает сообщение. Если возникает исключение, он отрицательно подтверждает сообщение.
Затем мы закрываем клиент Pulsar и соединение.
Запускаем в разных терминалах файл producer.py
и файл consumer.py
.
Это вывод файлового терминала producer.py
:
А это вывод файлового терминала consumer.py
:
Создание сервера FastAPI
Теперь мы собираемся создать сервер, который отправляет информацию журнала потребителю.
Каждый раз, когда на сервер делается запрос, сервер отправляет логи в тему «logs».
from fastapi import FastAPI, Request from datetime import datetime import pulsar app = FastAPI() @app.get("/") async def root(request: Request): ip_address = request.client.host request_url = request.url._url request_port = request.url.port request_path = request.url.path request_method = request.method request_time = datetime.now() browser_type = request.headers["User-Agent"] operating_system = request.headers["Accept"] message = { "ip_address": ip_address, "request_url": request_url, "request_port": request_port, "request_path": request_path, "request_method": request_method, "request_time": request_time, "browser_type": browser_type, "operating_system": operating_system, } client = pulsar.Client('pulsar://localhost:6650') producer = client.create_producer('my-topic') producer.send(str(message).encode("utf-8")) client.close() return {"message": message} if __name__ == "__main__": app.run(debug=True)
В файл main.py
импортируем библиотеки FastAPI, Request, datetime и pulsar.
Мы создаем приложение FastAPI и определяем корневую конечную точку /
, которая получает объект запроса. И извлекать из запроса различную информацию, такую как IP-адрес, URL-адрес, метод, заголовки и т. д. Затем мы создаем словарь сообщений с извлеченной информацией запроса.
Мы создаем соединение с брокером Pulsar, создаем производителя и отправляем словарь сообщений в тему my-topic.
И, наконец, этот код возвращает словарь сообщений в ответе.
Мы преобразуем словарь Python в строку, чтобы избежать этой ошибки сообщения:
Это происходит потому, что содержимое должно быть байтовым объектом. Мы конвертируем словарь в строку, чтобы иметь возможность его кодировать.
Вот документация производителя в клиенте Python.
Если мы перейдем к locahost:8000
с помощью веб-браузера, мы должны увидеть этот ответ.
Мы могли бы использовать HTTP-клиент, и мы должны получить тот же ответ.
Запускаем файл consumer.py
в другом терминале. И должен получить следующий ответ:
Заключение
Интеграция Apache Pulsar с FastAPI позволяет создавать высокопроизводительные API реального времени. Pulsar справляется со сложностями потоковой передачи и хранения событий, а FastAPI предоставляет простую, но мощную платформу для создания API. Эта комбинация позволяет отделить производителя событий от потребителя API.
Спасибо, что нашли время прочитать эту статью.
Если у вас есть рекомендации по другим пакетам, архитектурам, как улучшить мой код, мой английский или что-то в этом роде; оставьте комментарий или свяжитесь со мной через Twitter или LinkedIn.
Исходный код здесь.
Ресурсы
Локально запустить автономный кластер Pulsar
Терминология Pulsar Использование клиента Python
Производить и потреблять сообщения
Первоначально опубликовано на https://carlosmv.hashnode.dev.