Планирование очень большого количества заданий на python luigi

Я написал конвейер Luigi для извлечения 1,2 млн файлов, а затем выполнил над ними некоторую работу с sed — см. //gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f.

Если я запускаю это через Луиджи на нескольких тысячах файлов, все работает нормально. Но запуская это на всем наборе данных, он жалуется на Failed connecting to remote scheduler. Не уверен, что я делаю это правильно.


person Wolfgang Kerzendorf    schedule 09.07.2017    source источник


Ответы (1)


Я бы посоветовал не создавать отдельную задачу для каждого файла, когда их будет больше 1k. Вероятно, вам повезет больше, если вы создадите пакетную задачу, которая запускается в каталоге этих файлов. Затем эта задача может использовать многопроцессорность для использования параллельных вызовов вашей функции обработки.

from multiprocessing import Pool, cpu_count
import os

class TestTask(luigi.WrapperTask):
    inglob = luigi.Parameter(default='/1002/*.gz')
    outdir = luigi.Parameter(default='/1002-out/')
    tmpdir = luigi.Parameter(default='/1002-tmp/'

    def extract_file(filename):
        # extract file to self.tempdir not shown

    def output(self):
        return luigi.LocalTarget(self.outdir)

    def run(self):
        os.makedirs(self.tempdir)
        p = Pool(cpu_count())
        p.map(extract_file, glob(self.inglob))
        os.rename(self.tempdir, self.outdir)
person MattMcKnight    schedule 10.07.2017
comment
Спасибо за ваш ответ. Почему Луиджи не справляется с этим? Что я хочу, так это построить систему, которая определяет конвейеры для этого запуска независимо и которую я могу легко перезапустить из любой конкретной точки. Быть основанным на файловой системе было бы здорово. Я много раз видел потребность в такой структуре - мне интересно, как будто ее не существует. Знаете ли вы, что воздушный поток был бы более подходящим? - person Wolfgang Kerzendorf; 10.07.2017
comment
Для любой пакетной системы нет необходимости выполнять миллионы сетевых вызовов и отслеживать состояние отдельных записей в системе. Для пакетной системы вам просто нужен способ создания пакетов. Luigi позволяет вам повторно запустить процесс с любой точки, поэтому, если у вас есть несколько преобразований, вы можете повторно запустить с шага 1 (распаковать), 2 (обработать текст), 3 (вставить в БД) или 4 и т. д. Это меньше полезно повторно запустить после того, как половина пакета завершена, вам просто нужно настроить размеры пакетов, пока они не станут иметь смысл в качестве атомарной единицы обработки. - person MattMcKnight; 10.07.2017
comment
Тем не менее, если вы хотите восстановиться после частичного завершения в этой модели, вам нужно переместить файлы gz в готовую папку или что-то в этом роде, чтобы при следующем запуске просто были выбраны файлы, которые еще не завершены. . - person MattMcKnight; 10.07.2017
comment
Кажется, это полностью противоречит цели Луиджи. По сути, мы самостоятельно распараллеливаемся и проверяем, существует цель или нет. По сути, тогда Луиджи больше ничего не дает, верно? - person Wolfgang Kerzendorf; 11.07.2017
comment
Luigi не является общей структурой параллелизма. Я использую его для очень сложных многоступенчатых конвейеров, где иногда происходит сбой одного шага, и вам нужно перезапустить его. Некоторые из них используют такие вещи, как Apache Spark, в самом конвейере для выполнения распределенной параллельной обработки. У Luigi есть типы задач, специально разработанные для pyspark, что делает это возможным. - person MattMcKnight; 11.07.2017