Как создать зависимость между задачами в Apache Beam Python

Я новичок в Apache Beam и изучаю версию потока данных Apache Beam для Python. Я хочу выполнять свои задачи потока данных в определенном порядке, но он выполняет все задачи в параллельном режиме. Как создать зависимость задачи в python apache beam?

Пример кода: (в приведенном ниже коде файл sample.json содержит 5 строк)

import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions

class Sample(beam.PTransform):
    def __init__(self, index):
        self.index = index

    def expand(self, pcoll):
        logging.info(self.index)
        return pcoll

class LoadData(beam.DoFn):
    def process(self, context):
        logging.info("***")

if __name__ == '__main__':

    logging.getLogger().setLevel(logging.INFO)
    pipeline = beam.Pipeline(options=PipelineOptions())

    (pipeline
        | "one" >> Sample(1)
        | "two: Read" >> beam.io.ReadFromText('sample.json')
        | "three: show" >> beam.ParDo(LoadData())
        | "four: sample2" >> Sample(2)
    )
    pipeline.run().wait_until_finish()

Я ожидал, что он будет выполнен в порядке один, два, три, четыре. Но он работает в параллельном режиме.

вывод вышеуказанного кода:

INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
INFO:root:1
INFO:root:2
INFO:root:Running pipeline with DirectRunner.
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***

person MJK    schedule 17.03.2018    source источник
comment
Чего вы пытаетесь достичь, выполняя это последовательно? Кроме того, я не уверен, что должно делать ваше преобразование Sample: в том виде, в каком оно реализовано, оно ничего не делает. Также имейте в виду, что, как и план запроса к базе данных, конвейер сначала создается (это когда вы видите ведение журнала из expand ()), а затем оптимизируется бегуном и выполняется (это когда вы видите ** *).   -  person jkff    schedule 17.03.2018
comment
@jkff Я хочу загрузить данные из biquery в elasticsearch. В моем примере преобразования я выполняю такие операции, как создание, переиндексирование и удаление индексов elasticsearch. Итак, сначала мне нужно создать временный индекс, вторые данные загрузки и временный индекс ES, третье переиндексировать его, четвертое удалить мой временный индекс. Поэтому я хочу выполнить все эти задачи по порядку. но здесь сначала выполняются задачи создания, переиндексации и удаления, а затем выполняются данные загрузки. (вы можете увидеть логи, наконец, показанные *****)   -  person MJK    schedule 18.03.2018


Ответы (1)


Согласно документации по Dataflow:

Когда конвейер создает ваш фактический конвейер для распределенного выполнения, конвейер может быть оптимизирован. Например, может быть более эффективным с точки зрения вычислений выполнение определенных преобразований вместе или в другом порядке. Служба Dataflow полностью управляет этим аспектом выполнения вашего конвейера.

Также согласно документации Apache Beam:

API-интерфейсы делают упор на параллельную обработку элементов, что затрудняет выражение таких действий, как «присвоение порядкового номера каждому элементу в коллекции PCollection». Это сделано намеренно, поскольку такие алгоритмы гораздо чаще страдают от проблем с масштабируемостью. Параллельная обработка всех элементов также имеет некоторые недостатки. В частности, это делает невозможным пакетирование любых операций, таких как запись элементов в приемник или выполнение контрольных точек во время обработки.

Дело в том, что Dataflow и Apache Beam по своей природе параллельны; они были разработаны для работы с досадно параллельными вариантами использования и, возможно, не лучший инструмент, если вам требуется, чтобы операции выполнялись в определенном порядке. Как отметил @jkff, Dataflow оптимизирует конвейер таким образом, чтобы он наилучшим образом распараллеливал операции.

Если вам действительно нужно выполнить каждый из шагов в последовательном порядке, обходной путь - использовать вместо этого блокирует выполнение, используя waitUntilFinish() метод, как описано в этом другом ответе на переполнение стека. Однако я понимаю, что такая реализация будет работать только в пакетном конвейере, так как потоковые конвейеры будут потреблять данные непрерывно, и поэтому вы не можете заблокировать выполнение для работы с последовательными шагами.

person dsesto    schedule 21.03.2018