Задачи Луиджи уходят в бесконечные циклы

У меня есть простая задача luigi, которая при запуске выдаст другой параметр, как показано ниже.

import luigi

class ComputeJob(luigi.Task):

   id_parameter = luigi.parameter.IntParameter()

    #run defination
    def run(self):


        print ("\nrunning task {}".format(self.id_parameter))
        #Do some work here

        if self.id_parameter < 10: 
            next_val = self.id_parameter + 1
            yield ComputeJob(id_parameter = next_val)

Я ожидаю, что он запустится 10 раз, а затем выйдет из цикла, но после выполнения 10-й итерации он начнет повторно выполнять 9 шагов с самого начала. В связи с этим задачи на шагах 9 и 10 продолжают повторяться.

поэтому ожидаемый результат должен быть:

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10

но результат, который я получаю:

running task 1
running task 2
running task 3
running task 4
running task 5
running task 6
running task 7
running task 8
running task 9
running task 10
running task 9
running task 10
running task 9
running task 10
...
...
...

Что мне здесь не хватает?

Спасибо, ойшик


person Oyshik Moitra    schedule 15.03.2019    source источник
comment
Итак, я обнаружил, что каждый раз, когда вызывается задача, в целевой цели вывода вызывается exists(). Если цель присутствует, задача не будет повторно запущена. Я надеялся, что смогу полностью избавиться от вывода, поскольку моя задача не производит никакого вывода.   -  person Oyshik Moitra    schedule 16.03.2019


Ответы (1)


Я предлагаю вам посмотреть документацию по методу complete, если вы хотите" полностью отказаться от вывода ", как вы заявили в своем последующем комментарии.

Другой вариант - создать дополнительную задачу-оболочку для выполнения этой задачи ComputeJob, которую вы создали, столько раз, сколько захотите.

import luigi

class ComputeJob(luigi.Task):
  id_parameter = luigi.parameter.IntParameter()
  done = False

  #run definition
  def run(self):
    print ("\nrunning task {}".format(self.id_parameter))
    #Do some work here
    self.done = True

  def complete(self):
    if self.done:
      return True
    else:
      return False

class RunComputeJobs(luigi.WrapperTask):
  def requires(self):
    for i in range(1,10):
      yield ComputeJob(id_parameter = i)
person Taha Jirjees    schedule 19.03.2019