Apache Beam StatusRuntimeException в конвейере потока данных

Я работаю над конвейером потока данных, написанным на python2.7, с использованием apache_beam == 2.24.0. Работа конвейера заключается в том, чтобы получать сообщения pubsub из подписки с использованием ReadFromPubSub луча в пакетном режиме, выполнять некоторую обработку сообщений и затем сохранять полученные данные в двух разных таблицах bigquery. Я потребляю много данных. Версия Google-cloud-pubsub - 1.7.0. После запуска конвейера все работает нормально, но через несколько часов я начинаю получать исключение:

org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELED: вызов уже отменен

В консоли потока данных gcp журналы показывают эту ошибку, но само задание работает нормально. Он потребляет данные из подписки и записывает их в bigquery. На что здесь ссылается ОТМЕНЕН: вызов и почему я получаю эту ошибку? Как я могу это решить?

Полная трассировка стека:

Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
    org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
    org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:100)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.shouldWait(RemoteGrpcPortWriteOperation.java:124)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:167)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
    org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:57)
    org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
    org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
    org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)

comment
Конвейер потоковой передачи повторяет отказавшие элементы бесконечно. Пока системные задержки и актуальность данных нормальные, вам не нужно беспокоиться об ошибках низкого уровня. Похоже, это обычная ошибка grpc: stackoverflow.com/questions/57110811/. Вы упомянули использование Python SDK, трассировка стека находится на Java. Вы использовали какую-то функцию xlang?   -  person 大ドア東    schedule 31.03.2021
comment
Я просто использую apache-beam python sdk. SDK может использовать внутреннюю функцию xlang.   -  person Aman Goel    schedule 01.04.2021
comment
Ошибки не должны доставить особых хлопот. Кроме того, не могли бы вы попробовать использовать Python3 и более новые версии Beam? Могут быть некоторые проблемы с grpc, которые сейчас исправлены.   -  person 大ドア東    schedule 02.04.2021
comment
В проекте используется только версия python 2.7, а beam == 2,24 - последняя поддерживаемая версия для python2.7. Хотя конвейер использует средство запуска потока данных, но возможно ли, что процесс bash (который используется для запуска конвейера python), переходящий в спящий режим, может вызвать проблему? Вчера я отслеживал конвейер в течение 10 часов подряд и не получал ошибки, но обычно ошибка возникает в течение 3 часов после запуска конвейера.   -  person Aman Goel    schedule 02.04.2021
comment
Сценарий bash не должен вызывать этого, поскольку он работает в потоке данных. Здесь сообщалось об аналогичной проблеме, но она отмечена как не ошибка: issues.apache.org/jira / browse / BEAM-9630. Кажется, это не проблема, и вы, вероятно, можете игнорировать это. Я также добавил в тикет комментарий с вопросом об этом.   -  person 大ドア東    schedule 03.04.2021
comment
Спасибо. Это было полезно.   -  person Aman Goel    schedule 05.04.2021


Ответы (1)


У клиента, над которым я работаю, есть возможность поднять запрос на поддержку Google Cloud. Точный ответ службы поддержки Google Cloud:

Эта обнаруженная вами ошибка довольно безобидна. Поток данных представляет собой платформу для массовой параллельной обработки данных, и когда есть события автомасштабирования, которые могут перемещать рабочую виртуальную машину. Когда виртуальная машина завершает работу, канал grpc закрывается перед процессом runner, и обрабатываемый рабочий элемент будет повторен на другом недавно запущенном runner. Эти ошибки можно игнорировать.

person Aman Goel    schedule 15.04.2021