Я хочу иметь конвейер данных, который по сути выглядит так
где несколько задач запускаются соответствующими сообщениями pubsub, обрабатывают данные из входных сообщений pubsub, а последняя задача запускается только после того, как все эти рабочие процессы выполнены. Мне удалось запустить весь DAG с помощью PubSub (после этого guide с изменениями для PubSub), но запускает весь DAG, а не одну задачу. Есть ли способ запустить только 1 задачу в DAG извне (из Cloud Function / PubSub?)
РЕДАКТИРОВАТЬ
Это сокращенная версия того, как я думал, может быть код DAG:
import google.cloud.bigquery as bigquery
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators import python_operator
from airflow.operators import dummy_operator
def task1_1(**kwargs):
# I want this function to take the table name of source 1 from pubsub1, reads the table from BigQuery and processes it
client_bq = bigquery.Client()
table_name = kwargs['dag_run'].conf.get('message')
data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
# ETL Code
# .....
def task2_1(**kwargs):
# I want this function to take the table name of source 2 from pubsub2, reads the table from BigQuery and processes it
client_bq = bigquery.Client()
table_name = kwargs['dag_run'].conf.get('message')
data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
# ETL Code
# .....
def task_combine():
# This task is triggered when task1_1 and task2_1 are done
# More ETL code
with DAG(
'clean_am_workflow',
schedule_interval=None,
start_date=datetime.datetime.today() - datetime.timedelta(days=5),
catchup=False) as dag:
source_1 = python_operator.PythonOperator(
task_id='process_source_1',
python_callable=task1_1,
provide_context=True
)
source_2 = python_operator.PythonOperator(
task_id='process_source_2',
python_callable=task2_1,
provide_context=True
)
combine = python_operator.PythonOperator(
task_id='combine_sources',
python_callable=task_combine,
provide_context=True
)
[source_1, source_2] >> combine