Могу ли я вызвать службу концентратора сообщений Bluemix из Python?

Клиент kafka-python поддерживает Kafka 0.9, но явно не включает новые функции аутентификации и шифрования, поэтому я предполагаю, что он работает только с открытыми серверами (как и в предыдущих выпусках). В любом случае, даже Java-клиенту для подключения требуется специальный модуль входа в концентратор сообщений (по крайней мере, так может показаться из примера), который предполагает, что ничего не будет работать, если нет аналогичного модуля, доступного для Python.

Мой конкретный сценарий заключается в том, что я хочу использовать службу концентратора сообщений из блокнота Jupyter, также размещенного в Bluemix (служба Apache Spark).


person Rob Duncan    schedule 09.02.2016    source источник


Ответы (4)


Мне удалось подключиться с помощью библиотеки kafka-python:

$ pip install --user kafka-python

Затем ...

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

############################################
# Service credentials from Bluemix UI:
############################################
bootstrap_servers =   # kafka_brokers_sasl
sasl_plain_username = # user
sasl_plain_password = # password
############################################

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                         sasl_plain_username = sasl_plain_username,
                         sasl_plain_password = sasl_plain_password,
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version=(0,10))

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

Это сработало для меня из искры Bluemix как службы из ноутбука jupyter, однако обратите внимание, что этот подход не использует искру. Код просто работает на хосте драйвера.

person Chris Snow    schedule 30.10.2016

Запрошена поддержка SASL в клиенте Kafka Python: https://github.com/dpkp/kafka-python/issues/533, но пока метод входа в систему с именем пользователя и паролем, используемый Message Hub, не будет работать.

person SimonGormley    schedule 09.02.2016

Пока это не будет изначально поддерживаться службой Bluemix Apache Spark, вы можете использовать тот же подход, что и Проект анализа настроений в реальном времени. Вспомогательный код для этого можно найти в репозитории образцов cds labs spark на github.

person Randy Horman    schedule 09.02.2016

Мы добавили текст в нашу документацию о поддержке языков, отличных от Java — см. раздел «ПОДКЛЮЧЕНИЕ И АУТЕНТИФИКАЦИЯ В ПРИЛОЖЕНИИ НЕ-JAVA»: https://www.ng.bluemix.net/docs/services/MessageHub/index.html

Наш текущий метод аутентификации нестандартен и не поддерживается проектом Apache, но был временным решением. Команда Message Hub работает с сообществом Apache Kafka над разработкой КИП-43. Как только это будет завершено, мы изменим реализацию проверки подлинности Message Hub, чтобы она соответствовала, и можно будет реализовывать клиентов в соответствии с этой спецификацией на любом языке.

person Oliver Deakin    schedule 11.02.2016