Как запустить задачу извне в Dag Cloud Composer

Я хочу иметь конвейер данных, который по сути выглядит так

введите описание изображения здесь

где несколько задач запускаются соответствующими сообщениями pubsub, обрабатывают данные из входных сообщений pubsub, а последняя задача запускается только после того, как все эти рабочие процессы выполнены. Мне удалось запустить весь DAG с помощью PubSub (после этого guide с изменениями для PubSub), но запускает весь DAG, а не одну задачу. Есть ли способ запустить только 1 задачу в DAG извне (из Cloud Function / PubSub?)

РЕДАКТИРОВАТЬ

Это сокращенная версия того, как я думал, может быть код DAG:

import google.cloud.bigquery as bigquery

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators import python_operator
from airflow.operators import dummy_operator


def task1_1(**kwargs):
    # I want this function to take the table name of source 1 from pubsub1, reads the table from BigQuery and processes it
    client_bq = bigquery.Client()
    table_name = kwargs['dag_run'].conf.get('message')
    data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
    # ETL Code
    # ..... 


def task2_1(**kwargs):
    # I want this function to take the table name of source 2 from pubsub2, reads the table from BigQuery and processes it
    client_bq = bigquery.Client()
    table_name = kwargs['dag_run'].conf.get('message')
    data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
    # ETL Code
    # ..... 

def task_combine():
    # This task is triggered when task1_1 and task2_1 are done
    # More ETL code


with DAG(
        'clean_am_workflow',
        schedule_interval=None,
        start_date=datetime.datetime.today() - datetime.timedelta(days=5),
        catchup=False) as dag:

    source_1 = python_operator.PythonOperator(
        task_id='process_source_1',
        python_callable=task1_1,
        provide_context=True
        )

    source_2 = python_operator.PythonOperator(
        task_id='process_source_2',
        python_callable=task2_1,
        provide_context=True
        )

    combine = python_operator.PythonOperator(
        task_id='combine_sources',
        python_callable=task_combine,
        provide_context=True
        )

    [source_1, source_2] >> combine

person pa-nguyen    schedule 23.03.2021    source источник
comment
Здравствуйте, исправьте пожалуйста ссылку в своем вопросе и поделитесь пожалуйста своим дагом.   -  person PeterRing    schedule 23.03.2021
comment
@PeterRing Я исправил это и добавил код, но не знаю, поможет ли это. Так я хочу, чтобы DAG был. Вам нужно увидеть облачную функцию, которая также запускает DAG?   -  person pa-nguyen    schedule 23.03.2021


Ответы (1)


Вам нужно не запускать сам даг, а запускать другую задачу отдельно на основе большого запроса. Этого можно добиться с помощью датчика воздушного потока. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html Датчик SQL: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/sensors/sql.html

В этом случае даг будет срабатывать обычный cron. Задача с двумя датчиками будет периодически запрашивать bigquery, если этот запрос возвращает «готово к работе», то она запускает задачи. Поскольку 2 датчика независимы, последняя задача будет выполнена только тогда, когда будут выполнены и датчик, и задача.

person PeterRing    schedule 24.03.2021
comment
Спасибо за ваш ответ! Может ли датчик воздушного потока распознавать вновь созданную таблицу в наборе данных bigquery? - person pa-nguyen; 25.03.2021
comment
конечно, запросить метаданные - person PeterRing; 25.03.2021