Пропуск шага в конвейере лучей apache Python

Итак, я создаю конвейер Apache Beam и испытываю некоторые проблемы с пропуском остальных шагов в Python SDK. Вот упрощенный пример, с которым у меня проблемы:

import apache_beam as beam
import os 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
    pass

options = {
    'streaming': True
}

runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
    sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
    result = (sub_message | 'foo' >> beam.Map(foo))
    result | 'print' >> beam.Map(print)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

Итак, согласно этому: Apache Beam - пропустить этап конвейера, который находится на Java, если моя функция ничего не возвращает, тогда apache_beam должен пропустить остальные шаги. Поправьте меня, если я ошибаюсь, но в python это то же самое, что и возврат None, поэтому мой pass можно заменить на return None и быть таким же. Но когда я запускаю этот код с pass или return None, результат действительно переходит к следующему шагу. То есть он продолжает печатать None, хотя ничего печатать не должен, так как он должен пропустить все следующие шаги. Любая помощь приветствуется.


person kauii8    schedule 14.08.2020    source источник


Ответы (1)


Как ни странно, как только я опубликовал это, я нашел ответ в документации. Похоже, что в приведенной мной ссылке эквивалент использует ParDo НЕ карту, как я. На самом деле это должно выглядеть так:

import apache_beam as beam
import os 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
    def process(self, element):
        print('hi')
        pass

options = {
    'streaming': True
}

runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
    sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
    result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
    result | 'print' >> beam.Map(print)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()
person kauii8    schedule 14.08.2020
comment
Обратите внимание, что использование p в контексте автоматически вызывает p.run().wait_until_finish(), так что это может закончиться тем, что ваш конвейер будет отправлен дважды. - person robertwb; 21.08.2020