Цели MySQL в рабочем процессе Luigi

Моему TaskB требуется TaskA, и по завершении TaskA записывает в таблицу MySQL, а затем TaskB должен принять этот вывод в таблицу в качестве своего ввода.

Я не могу понять, как это сделать в Луиджи. Может ли кто-нибудь указать мне на пример или дать мне быстрый пример здесь?


person Rijo Simon    schedule 03.11.2016    source источник


Ответы (1)


Существующий MySqlTarget в luigi использует отдельную таблицу маркеров, чтобы указать, когда задача завершена. Вот грубый подход, который я бы выбрал ... но ваш вопрос очень абстрактный, поэтому на самом деле он, вероятно, будет более сложным.

import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget


class TaskA(luigi.Task):
    rundate = luigi.DateParameter(default=datetime.now().date())
    target_table = "table_to_update"
    host = "localhost:3306"
    db = "db_to_use"
    user = "user_to_use"
    pw = "pw_to_use"

    def get_target(self):
        return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
                           update_id=str(self.rundate))

    def requires(self):
        return []

    def output(self):
        return self.get_target()

    def run(self):
        #update table
        self.get_target().touch()


class TaskB(luigi.Task):
    def requires(self):
        return [TaskA()]

    def run(self):
        # reading from target_table
person MattMcKnight    schedule 04.11.2016
comment
Спасибо за это, Мэтт. Это действительно помогает. У меня был один вопрос, хотя? означает ли это, что MySqlTarget отслеживает, какая строка обновляется, используя update_id, который является основным идентификатором строки. И в каком случае, если мои первичные идентификаторы автоматически увеличиваются, что мне делать? - person Rijo Simon; 07.11.2016
comment
О, это сложно. Я думаю, вам придется использовать другое уникальное значение помимо идентификатора автоинкремента в качестве update_id. Он буквально работает """INSERT INTO {marker_table} (update_id, target_table) VALUES (%s, %s) ON DUPLICATE KEY UPDATE update_id = VALUES(update_id) """.format(marker_table=self.marker_table), (self.update_id, self.table) - person MattMcKnight; 08.11.2016
comment
Таким образом, кажется, что решение состоит в том, чтобы иметь таблицу обновлений, в которой регистрируются обновления рабочего процесса? Но я не хочу вести таблицу для каждой задачи (а задач у меня много). Итак, из вашей интерпретации sql того, что происходит, кажется, что я должен сделать это return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table, update_id=str(self.rundate), update_task_type = TASK_TYPE_A) - person Rijo Simon; 08.11.2016
comment
Да, это то, что MySQLTarget делает по умолчанию, он создает эту marker_table. Один из столбцов в этой таблице — target_table, который, предположительно, является таблицей, обновленной вашим заданием. - person MattMcKnight; 08.11.2016