Я новичок в Apache Beam и изучаю версию потока данных Apache Beam для Python. Я хочу выполнять свои задачи потока данных в определенном порядке, но он выполняет все задачи в параллельном режиме. Как создать зависимость задачи в python apache beam?
Пример кода: (в приведенном ниже коде файл sample.json содержит 5 строк)
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions
class Sample(beam.PTransform):
def __init__(self, index):
self.index = index
def expand(self, pcoll):
logging.info(self.index)
return pcoll
class LoadData(beam.DoFn):
def process(self, context):
logging.info("***")
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
pipeline = beam.Pipeline(options=PipelineOptions())
(pipeline
| "one" >> Sample(1)
| "two: Read" >> beam.io.ReadFromText('sample.json')
| "three: show" >> beam.ParDo(LoadData())
| "four: sample2" >> Sample(2)
)
pipeline.run().wait_until_finish()
Я ожидал, что он будет выполнен в порядке один, два, три, четыре. Но он работает в параллельном режиме.
вывод вышеуказанного кода:
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:root:1
INFO:root:2
INFO:root:Running pipeline with DirectRunner.
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***