У меня есть DoFn с отслеживанием состояния, который в основном объединяет поступающие элементы, и когда буфер достигает определенного размера, буфер очищается и элементы вставляются в BigQuery. Я заметил, что время от времени конвейер вызывает исключение, исключение не останавливает выполнение задания. Ниже представлена трассировка стека:
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
self._flush_buffer(buffer_state, count_state, buffer_size_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
rows = self._extract_rows(buffer_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
for row in buffer.read():
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
for elem in self.first:
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
self._underlying.get_raw(state_key, continuation_token))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
continuation_token=continuation_token)))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid
Это возникает, когда вызывается метод процесса и он пытается извлечь элементы из буфера, см. rows = self._extract_rows(buffer_state)
DoFn реализован точно так же, как в примере https://beam.apache.org/blog/timely-processing/#example-batched-rpc
--experiments=use_runner_v2
, когда работа начинается. - person Tudor Plugaru   schedule 09.12.2020(key, dict)
, и на самом деле я храню в буфере именно этот dict. Теперь, время от времени и по неизвестным мне причинам, когда элемент входит в ParDo, ключ в порядке, но фактическое значение равноNone
. Просмотр журналов показал мне, что тот же элемент с тем же ключом фактически обрабатывался в другом пакете в другом потоке. Не знаю, является ли это причиной, но на данный момент что-то вродеif not value...
помогло. - person Tudor Plugaru   schedule 10.12.2020