Недопустимый токен ParDo Work с отслеживанием состояния Apache Beam

У меня есть DoFn с отслеживанием состояния, который в основном объединяет поступающие элементы, и когда буфер достигает определенного размера, буфер очищается и элементы вставляются в BigQuery. Я заметил, что время от времени конвейер вызывает исключение, исключение не останавливает выполнение задания. Ниже представлена ​​трассировка стека:


Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
    self._flush_buffer(buffer_state, count_state, buffer_size_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
    rows = self._extract_rows(buffer_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
    for row in buffer.read():
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
    for elem in self.first:
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid

Это возникает, когда вызывается метод процесса и он пытается извлечь элементы из буфера, см. rows = self._extract_rows(buffer_state)

DoFn реализован точно так же, как в примере https://beam.apache.org/blog/timely-processing/#example-batched-rpc


person Tudor Plugaru    schedule 09.12.2020    source источник
comment
Вы используете cloud.google.com/ dataflow / docs / guides /?   -  person robertwb    schedule 09.12.2020
comment
@robertwb да, я установил --experiments=use_runner_v2, когда работа начинается.   -  person Tudor Plugaru    schedule 09.12.2020
comment
Работа всегда повторяется в случае сбоя для потоковых заданий, поэтому, если это временно, все должно быть в порядке, но я попытаюсь выяснить, почему вы получаете эту ошибку.   -  person robertwb    schedule 10.12.2020
comment
Я заметил, что это происходит на длительных рабочих местах, например, день или больше, и это не очень часто. Элементы моего конвейера выглядят так (key, dict), и на самом деле я храню в буфере именно этот dict. Теперь, время от времени и по неизвестным мне причинам, когда элемент входит в ParDo, ключ в порядке, но фактическое значение равно None. Просмотр журналов показал мне, что тот же элемент с тем же ключом фактически обрабатывался в другом пакете в другом потоке. Не знаю, является ли это причиной, но на данный момент что-то вроде if not value... помогло.   -  person Tudor Plugaru    schedule 10.12.2020


Ответы (1)


Я подтвердил, что эта ошибка ожидается во время переназначения работы, например при автомасштабировании. Рабочий элемент будет повторен на новом компьютере, и конвейер продолжит обработку правильно. (Я согласен, что сообщение об ошибке можно улучшить.)

person robertwb    schedule 10.12.2020
comment
Хм, я думаю, в этом есть смысл. Спасибо за помощь. - person Tudor Plugaru; 11.12.2020
comment
Быстрый вопрос, почему он всегда терпит неудачу при попытке чтения из буфера состояний? В последние несколько дней эта ошибка повторяется все чаще, и иногда конвейер застревает. - person Tudor Plugaru; 17.12.2020
comment
Чтение из состояния - это случай, когда бегун не может скрыть от вас, что работа была перемещена. (В противном случае он просто будет ждать завершения сборки и молча отбрасывает результаты.) - person robertwb; 17.12.2020
comment
Если ваш конвейер действительно застревает, я бы подал заявку в службу поддержки пользователей. - person robertwb; 17.12.2020
comment
Попался. Спасибо! - person Tudor Plugaru; 18.12.2020
comment
Привет еще раз, что ж, теперь я заметил, что у меня был конвейер, в котором возникла ошибка, подобная недопустимому рабочему токену, и после этой ошибки обе метрики «Задержка системы» и «Свежесть данных» начали сильно увеличиваться. Покопавшись, заметил, что некоторые обработанные события не достигли BigQuery, потому что это то, что делает конвейер. Вы знаете об этом больше? Идентификатор вакансии - 2020-12-30_22_10_27-11400116370140824357, если это помогает. Мне действительно нужно понять, в моем ли коде проблема или что-то связано с потоком данных. - person Tudor Plugaru; 03.01.2021