Я создаю динамический DAG с несколькими циклами для. Он правильно запускает поток, но не очень хорошо соединяется в дальнейшем по потоку. Задача dummy_ender_0_a подключается к toto_a, как и ожидалось. Но я ожидал, что dummy_ender
import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']
with models.DAG(
'test_'+ENV,
schedule_interval = None ,
concurrency = 1 ,
max_active_runs = 1,
catchup = False, # for now manualy catchup
default_args={
"start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
"retries": 1,
"retry_delay": datetime.timedelta(minutes=1),
"execution_timeout": datetime.timedelta(minutes=14), # safty check
# notifications
"email_on_failure": True,
"email_on_retry": False
'email': [models.Variable.get('alert_email')],
"project_id": models.Variable.get('project'),
'provide_context': True
}
) as dag:
def parsing_ts(**kwargs):
etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
etl_run_time = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
print('>>> START DATE >>>>',etl_run_time_nodash)
for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)
def printval(**kwargs):
etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)
def start(table,**kwargs):
return DummyOperator(task_id = "dummy_start_%s"%(table) )
def starter(i,table,**kwargs):
return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )
def ender(i,table,**kwargs):
return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )
def toto(table,**kwargs):
return DummyOperator(task_id = "toto_%s"%(table) )
def end(**kwargs):
return DummyOperator(task_id = "dummy_end")
parsing_ts_op = python_operator.PythonOperator(
task_id='parsing_ts_operator',
provide_context=True,
python_callable=parsing_ts)
printval_op = python_operator.PythonOperator(
task_id='printval_operator',
provide_context=True,
python_callable=printval)
for table in tables:
parsing_ts_op >> start(table)
for i in range(META_DELTA_DAYS):
start(table) >> starter(i,table)
starter(i,table) >> ender(i,table)
ender(i,table) >> toto(table)
toto(table) >> printval_op
printval_op >> end()
a и dummy_ender_2_a также подключатся к нисходящей задаче toto_a. Я не уверен, что мне здесь не хватает. Вот код: import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']
with models.DAG(
'test_'+ENV,
schedule_interval = None ,
concurrency = 1 ,
max_active_runs = 1,
catchup = False, # for now manualy catchup
default_args={
"start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
"retries": 1,
"retry_delay": datetime.timedelta(minutes=1),
"execution_timeout": datetime.timedelta(minutes=14), # safty check
# notifications
"email_on_failure": True,
"email_on_retry": False
'email': [models.Variable.get('alert_email')],
"project_id": models.Variable.get('project'),
'provide_context': True
}
) as dag:
def parsing_ts(**kwargs):
etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
etl_run_time = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
print('>>> START DATE >>>>',etl_run_time_nodash)
for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)
def printval(**kwargs):
etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)
def start(table,**kwargs):
return DummyOperator(task_id = "dummy_start_%s"%(table) )
def starter(i,table,**kwargs):
return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )
def ender(i,table,**kwargs):
return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )
def toto(table,**kwargs):
return DummyOperator(task_id = "toto_%s"%(table) )
def end(**kwargs):
return DummyOperator(task_id = "dummy_end")
parsing_ts_op = python_operator.PythonOperator(
task_id='parsing_ts_operator',
provide_context=True,
python_callable=parsing_ts)
printval_op = python_operator.PythonOperator(
task_id='printval_operator',
provide_context=True,
python_callable=printval)
for table in tables:
parsing_ts_op >> start(table)
for i in range(META_DELTA_DAYS):
start(table) >> starter(i,table)
starter(i,table) >> ender(i,table)
ender(i,table) >> toto(table)
toto(table) >> printval_op
printval_op >> end()