Концентраторы событий Azure (Python): создание контрольных точек с хранилищем больших двоичных объектов - ошибка ключа в EventProcessor при включении контрольных точек

У меня проблема с контрольной точкой хранилища BLOB-объектов в концентраторах событий. Мое приложение работает нормально, если у меня не установлен checkpoint_store при получении клиента-потребителя. Всякий раз, когда я пытаюсь установить переменную checkpoint_store и запустить свой код, он выдает следующее исключение:

Экземпляр EventProcessor 'xxxxxxxxxxx' концентратора событий ‹имя моего концентратора событий› группа потребителей ‹имя моей группы потребителей›. Произошла ошибка при балансировке нагрузки и заявлении права собственности. Исключение составляет KeyError ('ownerid'). Повторная попытка через xxxx секунд

Единственная запись в github, которую я смог найти, в которой даже упоминалась такая ошибка, - это этот, однако сама проблема так и не была решена, и человек, у которого возникла проблема, вместо этого использовал другую библиотеку.

Соответствующие библиотеки, которые я использую, - это azure-eventhub и azure-eventhub-checkpointstoreblob-aio.

Вот соответствующие фрагменты кода, который я использую (Я использовал это руководство в качестве руководства):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,
    )
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Проблема, похоже, связана исключительно с контрольными точками хранилища BLOB-объектов; если я закомментирую «checkpoint_store = checkpoint_store» при создании клиента-потребителя, все будет работать без проблем.

Соединение с хранилищем BLOB-объектов выглядит нормально, поскольку я немного покопался и обнаружил, что в хранилище BLOB-объектов были созданы некоторые папки, «контрольная точка» и «право собственности»: снимок хранилища BLOB-объектов Последний из которых содержит некоторые файлы с 'ownerid' в их метаданных: метаданные файлов владельцев

Т.е. ключ определенно существует. Я думаю, что происходит то, что EventProcessor пытается получить метаданные о владении этими BLOB-объектами, но каким-то образом не может этого сделать. Если у кого-то есть идеи, как это исправить, я был бы очень признателен!


person Ramon Samuel    schedule 11.08.2020    source источник
comment
Пожалуйста, не добавляйте ответы на свои вопросы. Я откатил / отредактировал ваш вопрос. Вместо этого напишите новый ответ.   -  person Sabito 錆兎    schedule 13.01.2021


Ответы (2)


Похоже, проблема с получением идентификатора владельца из одного из BLOB-объектов. Не могли бы вы сделать мне одолжение, чтобы проверить эти сценарии?

  1. Удалите все из контейнера BLOB-объектов и повторите попытку.
  2. Если проблема все еще существует, не могли бы вы проверить каждый большой двоичный объект, все ли они имеют идентификатор владельца метаданных?
  3. Если проблема все еще существует, не могли бы вы заменить строку 144 файла azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py в библиотеке azure-eventhub-checkpointstoreblob-aio версии 1.1.0 следующей и повторить попытку?
"owner_id": blob.metadata.get("ownerid"),
person Xie Yijun    schedule 11.08.2020
comment
Строка 144 в исходном коде checkpointstoreblobaio действительно вызвала проблему, замена ее на вашу правку решила ее. Большое спасибо! - person Ramon Samuel; 12.08.2020
comment
Спасибо Рамону за тест. Я не знаю, имеет ли это изменение какой-либо побочный эффект сейчас. Сообщите мне, если вы заметите какие-либо проблемы. Я создал проблему github в репозитории для отслеживания этой проблемы. . - person Xie Yijun; 12.08.2020
comment
Я не могу воспроизвести это в своем окружении. Не могли бы вы сказать мне свою версию Python и версию ОС? - person Xie Yijun; 12.08.2020
comment
В дополнение к редактированию строки144 мне также пришлось заменить строку 244 на "offset": blob.metadata.get("offset"), и строку 255 на "sequence_number": blob.metadata.get("sequencenumber"),, чтобы полностью исправить это. В настоящее время я разрабатываю и тестирую в среде лазурных модулей данных (Databricks Runtime 7.0 ML) - person Ramon Samuel; 12.08.2020
comment
У меня также была такая же проблема на моем локальном компьютере с Python 3.8.5 и Windows 10 build 18362.959 - person Ramon Samuel; 12.08.2020
comment
Это подавляет ошибки, но не может быть решением. Балансировщику нагрузки требуется ownerid, чтобы указать, какой процесс какой раздел принадлежит, и использовать смещение раздела в качестве данных контрольной точки для возобновления приема после перезапуска процесса. Проблема в том, почему эти важные метаданные НЕ извлекаются правильно, пока они находятся в метаданных большого двоичного объекта. Я тестировал Python 3.8.5 на моем компьютере с Windows 10, но не воспроизвел ту же проблему. Я продолжу искать первопричину. Можно ли попробовать его в 64-битной среде Python 3.7 на вашем компьютере? - person Xie Yijun; 12.08.2020
comment
Возможно, я нашел причину проблемы; Я тестировал свежий venv с установленным python 3.7.7 64bit. Я создал две новые учетные записи хранения с нуля (общее хранилище лазурных версий v1 и v2) и создал новые контейнеры в каждой. Проблема возникла (и продолжала возникать) только тогда, когда я использовал хранилище Azure V2 для контрольных точек; Все работало нормально, когда я вместо этого подключился к учетной записи Azure Storage V1. Возможно, на это стоит обратить внимание. - person Ramon Samuel; 13.08.2020
comment
Благодарю тебя, Рамон, за эту находку. Это мне очень помогло. Мы изучим это и сообщим вам. - person Xie Yijun; 14.08.2020
comment
@RamonSamuel Я тестировал как в лазурном хранилище v1, так и в v2. Проблем не увидел. Использует ли ваша учетная запись хранения версии 2 какие-либо функции, кроме настроек по умолчанию на лазурном портале? А какой регион ресурса хранения? Тестировал с west-us-2. Я могу создать ресурс в вашем регионе для тестирования. - person Xie Yijun; 25.08.2020
comment
Я использовал настройки по умолчанию для обеих учетных записей, регион - Восточная Австралия. - person Ramon Samuel; 26.08.2020

Основная причина заключается в том, что list_blobs функциональность sdk хранилища при вызове большого двоичного объекта хранилища v2 с включенным озером данных (иерархическое пространство имен) не только получит контрольную точку / владение для каждого раздела, но также получит родительский узел большого двоичного объекта, который не содержит метаданных.

Чтобы лучше проиллюстрировать это, предположим, что у нас есть следующие структуры больших двоичных объектов:

- fullqualifiednamespace (directory)
  - eventhubname (directory)
    - $default (directory)
        - ownership (directory)
          - 0 (blob)
          - 1 (blob)
          ...

в хранилище v2 с включенным озером данных (иерархическое пространство имен), когда код использовал префикс {<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership для поиска больших двоичных объектов, сам каталог {<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership также будет возвращен без метаданных, ведущих к KeyError, когда мы пытаемся извлечь информацию.

Существует выпуск с исправлением ошибок для sdk checkpointstoreblob, пожалуйста, обновите его до последней версии, чтобы увидеть, решит ли он вашу проблему.

Дайте мне знать, если у вас есть еще вопросы.

ссылки:

для синхронизации: https://pypi.org/project/azure-eventhub-checkpointstoreblob/1.1.2/

для async: https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/1.1.2/

Проблема с github: https://github.com/Azure/azure-sdk-for-python/issues/13060

person Adam Ling    schedule 12.01.2021