Как создать динамический рабочий процесс в Airflow DAG с несколькими циклами for?

Я создаю динамический DAG с несколькими циклами для. Он правильно запускает поток, но не очень хорошо соединяется в дальнейшем по потоку. Задача dummy_ender_0_a подключается к toto_a, как и ожидалось. Но я ожидал, что dummy_ender

import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator

ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']

with models.DAG(
        'test_'+ENV,
        schedule_interval = None ,
        concurrency = 1 ,
        max_active_runs = 1,
        catchup = False, # for now manualy catchup
        default_args={
            "start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
            "retries": 1,
            "retry_delay": datetime.timedelta(minutes=1),
            "execution_timeout": datetime.timedelta(minutes=14), # safty check
            # notifications
            "email_on_failure": True,
            "email_on_retry": False
            'email': [models.Variable.get('alert_email')],
            "project_id": models.Variable.get('project'),
            'provide_context': True
        }
) as dag:

    def parsing_ts(**kwargs):
        etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
        etl_run_time  = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
        print('>>> START DATE >>>>',etl_run_time_nodash)
        for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
            kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
            kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
            kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)





    def printval(**kwargs):
        etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
        print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)



    def start(table,**kwargs):
        return DummyOperator(task_id = "dummy_start_%s"%(table) )

    def starter(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )

    def ender(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )

    def toto(table,**kwargs):
        return DummyOperator(task_id = "toto_%s"%(table) )

    def end(**kwargs):
        return DummyOperator(task_id = "dummy_end")

    parsing_ts_op = python_operator.PythonOperator(
        task_id='parsing_ts_operator',
        provide_context=True,
        python_callable=parsing_ts)

    printval_op = python_operator.PythonOperator(
        task_id='printval_operator',
        provide_context=True,
        python_callable=printval)

    for table in tables:
        parsing_ts_op >> start(table)
        for i in range(META_DELTA_DAYS):
            start(table) >> starter(i,table)
            starter(i,table) >> ender(i,table)
            ender(i,table) >> toto(table)
        toto(table) >> printval_op
    printval_op >> end()
a и dummy_ender_2_a также подключатся к нисходящей задаче toto_a. Я не уверен, что мне здесь не хватает. Вот код:

import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator

ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']

with models.DAG(
        'test_'+ENV,
        schedule_interval = None ,
        concurrency = 1 ,
        max_active_runs = 1,
        catchup = False, # for now manualy catchup
        default_args={
            "start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
            "retries": 1,
            "retry_delay": datetime.timedelta(minutes=1),
            "execution_timeout": datetime.timedelta(minutes=14), # safty check
            # notifications
            "email_on_failure": True,
            "email_on_retry": False
            'email': [models.Variable.get('alert_email')],
            "project_id": models.Variable.get('project'),
            'provide_context': True
        }
) as dag:

    def parsing_ts(**kwargs):
        etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
        etl_run_time  = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
        print('>>> START DATE >>>>',etl_run_time_nodash)
        for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
            kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
            kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
            kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)





    def printval(**kwargs):
        etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
        print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)



    def start(table,**kwargs):
        return DummyOperator(task_id = "dummy_start_%s"%(table) )

    def starter(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )

    def ender(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )

    def toto(table,**kwargs):
        return DummyOperator(task_id = "toto_%s"%(table) )

    def end(**kwargs):
        return DummyOperator(task_id = "dummy_end")

    parsing_ts_op = python_operator.PythonOperator(
        task_id='parsing_ts_operator',
        provide_context=True,
        python_callable=parsing_ts)

    printval_op = python_operator.PythonOperator(
        task_id='printval_operator',
        provide_context=True,
        python_callable=printval)

    for table in tables:
        parsing_ts_op >> start(table)
        for i in range(META_DELTA_DAYS):
            start(table) >> starter(i,table)
            starter(i,table) >> ender(i,table)
            ender(i,table) >> toto(table)
        toto(table) >> printval_op
    printval_op >> end()

Поток DAG


person Shibu Valayil    schedule 24.03.2020    source источник


Ответы (1)


Благодаря Крису Гейзебруку мне удалось решить проблему. Он попросил меня сделать toto (таблицу) перед внутренним циклом и назначить его переменной, которую вы используете внутри внутреннего цикла. Вот как я изменил секцию зацикливания:

for table in tables:
    d = toto(table)
    parsing_ts_op >> start(table)
    for i in range(META_DELTA_DAYS):
        start(table) >> starter(i,table)
        starter(i,table) >> ender(i,table)
        ender(i,table) >> d
    d >> printval_op
printval_op >> end()
person Shibu Valayil    schedule 25.03.2020