Это статья о том, как настроить Apache Pulsar, создавать сообщения и использовать их с помощью FastAPI.

В этой статье не будут показаны все функции Apache Pulsar или FastAPI.

Apache Pulsar и FastAPI — это две мощные технологии, которые в сочетании могут помочь вам создавать чрезвычайно быстрые и масштабируемые API-интерфейсы в реальном времени. Apache Pulsar — ​​это распределенная платформа потоковой передачи с открытым исходным кодом, которая действует как посредник между производителями и потребителями потоков данных. FastAPI — это современная, быстрая (высокопроизводительная) веб-инфраструктура для создания API с помощью Python. В этой статье мы увидим, как интегрировать Apache Pulsar с FastAPI, чтобы использовать возможности потоковой передачи событий Pulsar для поддержки ваших приложений FastAPI.

Требования

  • Среда выполнения Java (64-разрядная версия)
  • Python установлен
  • Пип установлен
  • WSL установлен для пользователей Windows

Апачский пульсар

Pulsar — ​​это многопользовательское высокопроизводительное решение для обмена сообщениями между серверами. Первоначально разработанный Yahoo, Pulsar находится под управлением Apache Software Foundation.

Ключевые особенности Pulsar перечислены ниже:

  • Встроенная поддержка нескольких кластеров в экземпляре Pulsar с бесшовной георепликацией сообщений между кластерами.
  • Очень низкая публикация и сквозная задержка.
  • Плавная масштабируемость до более чем миллиона тем.
  • Простой клиентский API с привязками для Java, Go, Python и C++.
  • Несколько типов подписки (эксклюзивная, общая и отказоустойчивая) для тем.
  • Гарантированная доставка сообщений с постоянным хранилищем сообщений, предоставляемым Apache BookKeeper. Облегченная бессерверная вычислительная среда Pulsar Functions предлагает возможность потоковой обработки данных.
  • Платформа бессерверного соединителя Pulsar IO, построенная на Pulsar Functions, упрощает перемещение данных в Apache Pulsar и из него.
  • Многоуровневое хранилище переносит данные из горячего/теплого хранилища в холодное/долгосрочное хранилище (например, S3 и GCS), когда данные устаревают.

Монтаж

Чтобы установить Apache Pulsar, переходим на эту страницу, скачиваем бинарный файл и разархивируем его.

Или выполните следующие команды:

wget https://archive.apache.org/dist/pulsar/pulsar-3.0.0/apache-pulsar-3.0.0-bin.tar.gz
tar xvfz apache-pulsar-3.0.0-bin.tar.gz
cd apache-pulsar-3.0.0
ls -1F

Пользователи Windows должны выполнять приведенные выше команды с помощью WSL.

Чтобы запустить кластер Pulsar, мы запускаем эту команду, чтобы запустить автономный кластер Pulsar:

bin/pulsar standalone

После того, как мы запустили наш кластер Pulsar, мы создаем два файла: производителя и потребителя.

Как гласит терминология Pulsar в документации, производитель — это процесс, который публикует сообщения в тему Pulsar. Процесс, который устанавливает подписку на тему Pulsar и обрабатывает сообщения, опубликованные в этой теме производителями.

Тема — это именованный канал, используемый для передачи сообщений, опубликованных производителями, потребителям, которые обрабатывают эти сообщения.

producer.py

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
    producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

В этом коде мы импортируем клиентскую библиотеку Apache Pulsar. Затем мы подключаемся к брокеру Pulsar, работающему на локальном хосте: 6650. Мы создаем продюсера с темой my-topic. Затем мы создаем цикл For для отправки 10 сообщений в заданную нами тему. Наконец, мы закрываем соединение.

consumer.py

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('my-topic', 'my-subscription')

while True:
    msg = consumer.receive()
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
    except Exception:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)

client.close()

В потребителе мы импортируем клиентскую библиотеку Apache Pulsar и создаем соединение с брокером Pulsar, работающим локально на порту 6650. Мы подписываемся на тему «my-topic» с именем подписки «my-subscription».

Мы создаем бесконечный цикл, который будет непрерывно получать и обрабатывать сообщения. В случае успеха он подтверждает сообщение. Если возникает исключение, он отрицательно подтверждает сообщение.

Затем мы закрываем клиент Pulsar и соединение.

Запускаем в разных терминалах файл producer.py и файл consumer.py.

Это вывод файлового терминала producer.py:

А это вывод файлового терминала consumer.py:

Создание сервера FastAPI

Теперь мы собираемся создать сервер, который отправляет информацию журнала потребителю.

Каждый раз, когда на сервер делается запрос, сервер отправляет логи в тему «logs».

main.py

from fastapi import FastAPI, Request
from datetime import datetime
import pulsar 

app = FastAPI()

@app.get("/")
async def root(request: Request):

    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]
    message = {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }

    client = pulsar.Client('pulsar://localhost:6650')
    producer = client.create_producer('my-topic')
   
    producer.send(str(message).encode("utf-8"))
    client.close()
    return {"message": message}

if __name__ == "__main__":
    app.run(debug=True)

В файл main.py импортируем библиотеки FastAPI, Request, datetime и pulsar.

Мы создаем приложение FastAPI и определяем корневую конечную точку /, которая получает объект запроса. И извлекать из запроса различную информацию, такую ​​как IP-адрес, URL-адрес, метод, заголовки и т. д. Затем мы создаем словарь сообщений с извлеченной информацией запроса.

Мы создаем соединение с брокером Pulsar, создаем производителя и отправляем словарь сообщений в тему my-topic.

И, наконец, этот код возвращает словарь сообщений в ответе.

Мы преобразуем словарь Python в строку, чтобы избежать этой ошибки сообщения:

Это происходит потому, что содержимое должно быть байтовым объектом. Мы конвертируем словарь в строку, чтобы иметь возможность его кодировать.

Вот документация производителя в клиенте Python.

Если мы перейдем к locahost:8000 с помощью веб-браузера, мы должны увидеть этот ответ.

Мы могли бы использовать HTTP-клиент, и мы должны получить тот же ответ.

Запускаем файл consumer.py в другом терминале. И должен получить следующий ответ:

Заключение

Интеграция Apache Pulsar с FastAPI позволяет создавать высокопроизводительные API реального времени. Pulsar справляется со сложностями потоковой передачи и хранения событий, а FastAPI предоставляет простую, но мощную платформу для создания API. Эта комбинация позволяет отделить производителя событий от потребителя API.

Спасибо, что нашли время прочитать эту статью.

Если у вас есть рекомендации по другим пакетам, архитектурам, как улучшить мой код, мой английский или что-то в этом роде; оставьте комментарий или свяжитесь со мной через Twitter или LinkedIn.

Исходный код здесь.

Ресурсы

Локально запустить автономный кластер Pulsar

Настроить Python-клиент

Терминология Pulsar Использование клиента Python

Производить и потреблять сообщения

Первоначально опубликовано на https://carlosmv.hashnode.dev.