В этом блоге собраны мои заметки об Airflow. Я решил написать его, чтобы объединить все свои знания в одном месте с хорошим практическим проектом. Я попытался ясно изложить все понятия/жаргон, чтобы сделать наше понимание воздушного потока ясным и точным. Однако, прежде чем прыгать и копаться в этом, есть два предварительных условия.

🔴 Питон

🔴 Извлечение-преобразование-загрузка ака ETL

И причина этих предпосылок очевидна: либо мы создаем отчет, либо проект машинного обучения, ETL является обязательным для обоих, и, поскольку Airflow написан на Python, мы не можем этого избежать.

Введение

Airflow — это пакетно-ориентированная среда для создания конвейеров данных.

Он использует DAG для создания сетей обработки данных или конвейеров.

  • DAG означает — ›Прямой ациклический граф. Течет в одном направлении. Нельзя вернуться в ту же точку, т.е. ациклическую.
  • Во многих средах обработки данных над данными выполняется ряд вычислений, чтобы подготовить их к одному или нескольким конечным пунктам назначения. Этот тип потока обработки данных часто называют конвейером данных. Поток DAG или обработки данных может иметь несколько путей, также называемых ветвлениями.

Простейший DAG может быть таким.

где

  • чтение данных из какой-либо конечной точкии запись в хранилище — представляет задачу (единицу работа)
  • Стрелка представляет направление обработки и зависимости для проверки того, на каком основании будет инициировано следующее действие.

Итак, почему мы должны использовать Airflow?

  • Если вам нравится Все как коди все означает все, включая ваши конфигурации, то Airflow — правильный выбор. Все как механизм кода помогает создать конвейер любого сложного уровня для решения проблемы.
  • Если вам нравится открытый исходный код, потому что почти все, что вы можете получить, как встроенный оператор или исполнитель.
  • Функции заполнения. Это позволяет повторно обрабатывать исторические данные.

И почему бы вам не использовать Airflow?

  • Если вы хотите построить конвейер потоковой передачи данных.

Архитектура воздушного потока

Итак, у нас есть хотя бы представление о том, что Airflow создан для построения конвейеров данных. Ниже мы можем увидеть различные компоненты Airflow и их внутренние соединения.

Мы можем видеть вышеуказанные компоненты, когда устанавливаем Airflow, и Airflow неявно устанавливает их, чтобы облегчить выполнение конвейеров. Эти компоненты

  • Каталог DAG, чтобы все DAG могли быть прочитаны планировщиком и исполнителем.
  • Планировщик анализирует DAGS, проверяет их запланированный интервал и начинает планировать задачи DAG для выполнения, передавая их рабочим потокам воздуха.
  • Рабочие несут ответственность за выполнение фактической работы. Он берет задачи и выполняет их.
  • Веб сервер представляет собой удобный пользовательский интерфейс для проверки, запуска и отладки поведения групп обеспечения доступности баз данных и задач.
  • База данных метаданных, используемая планировщиком, исполнителем и веб-сервером для хранения состояния, чтобы все они могли общаться и принимать решения. - перейдите по этой ссылке, чтобы узнать, как установить и получить переменные метаданных. См. раздел Доступ к базе данных метаданных.

На данный момент достаточно архитектуры. Перейдем к следующей части.

Установка воздушного потока

Airflow предоставляет множество вариантов установки. Вы можете прочитать все варианты в официальной документации по воздушному потоку, а затем решить, какой вариант вам подходит. Однако, чтобы сделать его простым и экспериментальным, я буду использовать способ установки Docker.

Установка Airflow с помощью Docker проста и интуитивно понятна, что помогает нам понять типичные функции и работу Airflow. Ниже приведены предварительные условия для запуска Airflow в Docker.

  • Docker Community Edition установлен на вашем компьютере. Проверьте эту ссылку для Windows и Mac. Я следил за этим блогомдля установки Docker на Mac
  • Установка Docker Compose.

Предупреждения. Для ядра Docker вам потребуется не менее 4 ГБ памяти.

Обратитесь к этой ссылке, если вы пытаетесь установить Airflow в Windows (я сам не пробовал) — ссылка1 и ссылка2

Этапы установки (на Mac)

  1. Создайте имя файла как airflow_runner.sh. Скопируйте приведенные ниже команды в скрипт.
docker run --rm "debian:buster-slim" bash -c 'numfmt --to iec $(echo $(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE))))'
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
  1. Предоставьте доступ для выполнения к файлу. chmod +x airflow_runner.sh
  2. Запустите исходный файл airflow_runner.sh.
  3. После успешного выполнения вышеуказанных шагов запустите docker-compose up airflow-init, чтобы инициализировать базу данных.

После завершения инициализации вы должны увидеть сообщение, подобное приведенному ниже.

airflow-init_1       | Upgrades done
airflow-init_1       | Admin user airflow created
airflow-init_1       | 2.3.0
start_airflow-init_1 exited with code 0

Теперь мы готовы перейти к следующему шагу.

Запуск проекта Docker Airflow

👉 docker-compose up — build

Вышеупомянутая команда запускает среду докера и также запускает службы ниже.

  • Веб сервер
  • Планировщик
  • База данных Postgres для хранилища метаданных

Через несколько секунд, когда все готово, веб-сервер доступен по адресу http://localhost:8080. Учетная запись по умолчанию имеет логин airflow и пароль airflow.

Из терминала вы также можете запустить docker ps, чтобы проверить запущенные процессы.

Убираться

Чтобы остановить и удалить контейнеры, удалить тома с данными БД и загрузить образы, выполните:

👉 docker-compose down — volumes — rmi all

Основы воздушного потока

Мы установили Airflow и хорошо знаем, что он означает, но нам еще предстоит найти, как построить наш первый конвейер.

Прежде чем мы начнем создавать конвейер, нам нужно коснуться еще нескольких концепций, чтобы создать полноценный проект.

Освежим нашу память еще раз.

Airflow работает по принципуDAG, а DAG представляет собой ациклический граф. Мы видели этот простой пример

чтение данных из какой-либо конечной точки → запись в хранилище

Давайте шаг за шагом перейдем к созданию DAG для Airflow.

  • Шаг 1

Создайте группу обеспечения доступности баз данных. Он принимает уникальное имя, когда его запускать и какой может быть интервал запуска. Есть много других параметров, с которыми он согласен, но давайте придерживаться этих трех.

dag = DAG(                                                     
   dag_id="my_first_dag",                          
   start_date=airflow.utils.dates.days_ago(2),                
   schedule_interval=None,                                     
)
  • Шаг 2

А теперь нам нужно создать две наши функции (я создаю фиктивные функции) и привязать их к Оператору.

def read_data_from_some_endpoint():
    pass
def write_to_storage():
    pass
  • Шаг 3

Давайте создадим наших операторов. У нас есть функции Python, которые нужно привязать к некоторым операторам. Последний аргумент заключается в том, что он принимает DAG. Здесь нам нужно сообщить оператору, какой даг он будет учитывать.

download_data = PythonOperator(# This is our Airflow Operator.
    task_id="download_data", #unique name; it could be any name 
    python_callable=read_data_from_some_endpoint, #python function/callable
    dag = dag #Here we will attach our operator with the dag which we created at 1st step.
) 
persist_to_storage = PythonOperator(
    task_id = "persist_to_storage",  
    python_callable = write_to_storage,
    dag = dag
)
  • Шаг 4

Теперь давайте создадим порядок выполнения наших операторов

download_data >> persist_to_storage  # >> is bit shift operator in python which is overwritten in Airflow to indicate the direction upstream and downstream of task flow.

Вот и все. Мы успешно создали нашу первую DAG.

Определите свою задачу и DAG

Airflow предоставляет три способа определения вашей группы обеспечения доступности баз данных.

  1. Классический
  2. с менеджером контекста
  3. Декораторы

Неважно, как вы определяете свой рабочий процесс, но придерживаться только одного из них помогает отлаживать и проверять кодовую базу. Смешивание разных определений может привести к путанице в коде (хотя это личный выбор).

import pendulum
from airflow import DAG
from airflow.operators.dummy import DummyOperator
# Classical
dag = DAG("classical_dag", 
          start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
          schedule_interval="@daily", 
          catchup=False)
op = DummyOperator(task_id="a-dummy-task", dag=dag)
# with context manager 
with DAG(
    "context_manager_dag", 
    start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
    schedule_interval="@daily", 
    catchup=False
) as dag:
    op = DummyOperator(task_id="a-dummy-task")
# Decorators 
@dag(start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
     schedule_interval="@daily", 
     catchup=False)
def generate_decorator_dag():
    op = DummyOperator(task_id="a-dummy-task")
dag = generate_decorator_dag()

Как создать немного более сложный поток задач?

Возьмем этот пример.

Мы видим, что на изображении выше были использованы два цветовых кода.

светло-красный — показывает ветвь потока (два или более потока), т. е. ветвь_1, ветвь_2

светло-зеленый — обычная задача для другой цели. то есть false_1, false_2, true_2 и т. д.

Теперь, не беспокоясь о коде, давайте создадим поток задач для представления вышеуказанной структуры.

1- Нижний рабочий процесс из branch_1

branch_1 ›› true_1 ›› join_1

2- Верхний рабочий процесс из branch_1

  • верхний поток имеет два участка. Первая часть идет до branch_2

branch_1 ›› false_1 ›› branch_2

  • а затем в branch_2 происходит два параллельных выполнения и идет до false_3

branch_2 ›› false_2 ›› join_2 ›› false_3

branch_2 ›› true_2 ›› join_2 ›› false_3

Поскольку false_2 и true_2 выполняются параллельно, мы можем объединить их (поместить в список) в Сюда

branch_2 ›› [true_2, false_2]›› join_2 ›› false_3

и, наконец, мы можем объединить вышеуказанные шаги следующим образом

branch_1 ›› false_1 ›› branch_2 ›› [true_2, false_2] ›› join_2 ›› false_3 ›› join_1

Итак, мы получили эти два из шага 1 и шага 2.

branch_1 ›› true_1 ›› join_1

branch_1 ›› false_1 ›› branch_2 ›› [true_2, false_2] ›› join_2 ›› false_3 ›› join_1

и это представляет собой выполнение задачи или DAG.

Как оператор сдвига битов (›› или ‹‹) определяет зависимость задачи?

Методы __rshift__ и __lshift__ класса BaseOperator реализуют логический оператор битового сдвига Python в контексте установки задача или группа обеспечения доступности баз данных ниже по течению от другой. См. реализация здесь.

Таким образом, сдвиг битов использовался как синтаксический сахар для задач set_upstream (‹‹) и set_downstream (››).

Например, task1 ›› task2 совпадает с task2 ‹‹ task1 совпадает с task1.set_downstream(task2) совпадает с task1 .set_upstream(задача2)

Этот оператор играет важную роль в построении отношений между задачами.

Эффективный дизайн задач

Созданная задача должна следовать

1- атомарность

Это означает, что либо все происходит, либо ничего не происходит. Таким образом, каждая задача должна выполнять только одно действие, а в противном случае функциональность должна быть разделена на отдельные задачи.

2- Идемпотентность

Задача Airflow называется идемпотентной, если многократный вызов одной и той же задачи с одними и теми же входными данными не дает дополнительного эффекта. Это означает, что повторный запуск задачи без изменения входных данных не должен изменить общий вывод.

для загрузки данных. Его можно сделать идемпотентным, проверив существующие результаты или убедившись, что задача перезаписывает предыдущие результаты.

для загрузки базы данных: upsertможет использоваться для перезаписи или обновления предыдущей работы, выполненной в таблицах.

3- Назад Заполнение предыдущей задачи

Свойство помогает обрабатывать исторические данные. Класс DAG можно инициировать с помощью свойства catchup.

если catchup=False-> Airflow начинает обработку с текущего интервала.

Если catchup=True -› это свойство по умолчанию. Airflow начинает обработку с прошлого интервала.

Переменные времени выполнения

Все операторы загружают context предварительно загруженную переменную, чтобы предоставить наиболее часто используемые переменные во время выполнения DAG. Примеры Python можно показать здесь

from urllib import request
 
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
 
dag = DAG(
    dag_id="showconext",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
)
 
def _show_context(**context):
    """
    the context contains these preloaded items 
    to pass in dag during runtime.
    Airflow’s context dictionary can be found in the
    get_template_context method, in Airflow’s models.py.
    
    {
    'dag': task.dag,
    'ds': ds,
    'ds_nodash': ds_nodash,
    'ts': ts,
    'ts_nodash': ts_nodash,
    'yesterday_ds': yesterday_ds,
    'yesterday_ds_nodash': yesterday_ds_nodash,
    'tomorrow_ds': tomorrow_ds,
    'tomorrow_ds_nodash': tomorrow_ds_nodash,
    'END_DATE': ds,
    'end_date': ds,
    'dag_run': dag_run,
    'run_id': run_id,
    'execution_date': self.execution_date,
    'prev_execution_date': prev_execution_date,
    'next_execution_date': next_execution_date,
    'latest_date': ds,
    'macros': macros,
    'params': params,
    'tables': tables,
    'task': task,
    'task_instance': self,
    'ti': self,
    'task_instance_key_str': ti_key_str,
    'conf': configuration,
    'test_mode': self.test_mode,
    }
    """
   start = context["execution_date"]        
   end = context["next_execution_date"]
   print(f"Start: {start}, end: {end}")
 
 
show_context = PythonOperator(
   task_id="show_context", 
   python_callable=_show_context, 
   dag=dag
)

Перечисленные выше переменные предварительно загружаются в контексте и могут использоваться в любом месте оператора. Динамическая ссылкапроисходит с помощью шаблонов Jinja.

например {{ds}}, {{next_ds}}, {{dag_run}}

Шаблоны полей и скриптов

Два атрибута в BaseOperator определяют, что мы можем использовать для создания шаблонов.

template_fields: содержит список переменных, которые можно шаблонировать.

template_ext: содержит список расширений файлов, которые можно читать и создавать шаблоны во время выполнения.

См. пример объявления этих двух полей.

class BashOperator(BaseOperator):
# defines which fields are templateable
    template_fields = ('bash_command', 'env') 
    template_ext = ('.sh', '.bash')  # defines which file extensions are templateable
    def __init__(
        self,
        *,
        bash_command,
        env: None,
        output_encoding: 'utf-8',
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bash_command = bash_command  # templateable (can also give path to .sh or .bash script)
        self.env = env  # templateable
        self.output_encoding = output_encoding  # not templateable

Пример DAG, который использует контекст Airflow для создания шаблонов

Давайте возьмем пример, чтобы продемонстрировать силу шаблонов

from datetime import datetime
BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ macros.datetime.now() }}", 
)

👉 Note

Если вы видите здесь, мы использовали макрос для вызова datetime.now(). Если мы не используем макрос, он вызовет jinja2.exceptions.UndefinedError: ‘datetime’ is undefined exception.

Проверьте здесь список макросов.

Но теперь вы можете подумать о том, откуда мы взяли PythonOperator, DAG и т. д. Мы рассмотрим важные модули Airflow, чтобы понять это.

👉 Эта тема подробно освещена в этом блоге.

Структура модуля воздушного потока

Airflow имеет стандартную модульную структуру. Все его важные пакеты находятся под потоком воздуха. Немногие из основных модульных структур здесь

  • airflow — для DAG и других базовых API.
  • airflow.executors : Для всех встроенных исполнителей.
  • airflow.operators : Для всех встроенных операторов.
  • airflow.models : Для DAG, ошибка журнала, пул, xcom (кросс-коммуникация) и т. д.
  • airflow.sensors : Различные датчики (простыми словами, это либо временной интервал, либо файловый наблюдатель, чтобы соответствовать некоторым критериям выполнения задач)
  • airflow.hooks : Предоставляет различные модули для подключения внешних служб API или баз данных.

Итак, взглянув на приведенный выше модуль, мы можем быстро определить, что для получения PythonOperator или любого другого оператора нам нужно импортировать их из airflow.operators. Точно так же executor можно импортировать из airflow.executors и так далее.

Кроме того, многие поставщики пакетов, в том числе поставщики и сторонние компании, расширяют возможности Airflow. Все провайдеры следуют apache-airflow-providers номенклатуре сборки пакета. Провайдеры могут содержать операторов, крючки, датчики и операторы передачи для связи со многими внешними системами, но они также могут расширять ядро ​​​​Airflow новыми возможностями.

Это список провайдеров — список провайдеров

Нагрузки

Операторы

Операторы помогают запустить вашу функцию или любую исполняемую программу.

Типы операторов

В основном существует три типа операторов.

(i) Операторы

Помогает вызвать определенное действие. Немногие из них

  • PythonOperator — чтобы обернуть вызываемые объекты/функции Python внутри него.
  • BashOperator — для вызова сценария или команды bash. Внутри BashOperator мы также можем вызвать любую исполняемую программу.
  • DummyOperator — показать фиктивную задачу.
  • DockerOperator — для записи и выполнения образов Docker.
  • EmailOperator — для отправки электронной почты (с использованием конфигурации SMTP).

и многие другие операторы выполняют выходы. См. полный список операторов в официальной документации.

(ii) Датчики

Определенный тип оператора, целью которого является ожидание события, чтобы начать выполнение. Например,

  • ExternalTaskSensor ожидает завершения выполнения другой задачи (в другой DAG).
  • S3KeySensor Сенсоры S3 Key используются для ожидания доступности определенного файла или каталога в корзине S3.
  • NamedHivePartitionSensor — ожидает появления набора разделов в Hive.

(iii) Трансферы

Перемещает данные из одного места в другое. например

  • MySqlToHiveTransfer Перемещает данные из MySql в Hive.
  • S3ToRedshiftTransfer загружает файлы из s3 в Redshift.

Планировщик

Планировщик является важнейшим компонентом Airflow, так как здесь происходит большая часть волшебства. Он определяет, когда и как выполняются ваши конвейеры. На высоком уровне планировщик выполняет следующие шаги.

  1. После того как пользователи записали свои рабочие процессы в виде групп обеспечения доступности баз данных, планировщик считывает файлы, содержащие эти группы обеспечения доступности баз данных, для извлечения соответствующих задач, зависимостей и интервалов планирования для каждой группы обеспечения доступности баз данных.
  2. Затем для каждой DAG планировщик проверяет, прошел ли запланированный интервал для DAG с момента последнего чтения. Если это так, задачи в DAG запланированы для выполнения.
  3. Затем для каждой запланированной задачи планировщик проверяет, выполнены ли зависимости задачи (= вышестоящие задачи). Если это так, задача добавляется в очередь выполнения.
  4. Планировщик ждет несколько секунд, прежде чем начать новый цикл, возвращаясь к шагу 1.

👉 Чтобы запустить планировщик, просто запустите команду airflow scheduler.

В Airflow при определении DAG мы предоставляем несколько параметров, чтобы сообщить планировщику, когда необходимо запускать задания.

start_date -› когда запускать группу обеспечения доступности баз данных.

end_date -› когда остановить DAG

schedule_interval -›Интервал времени для последующего запуска. ежечасно, ежедневно, минуты и т. д.

depends_on_past -› Логическое значение, определяющее, когда будет выполняться DAG.

retry_delay -›продолжительность следующей повторной попытки. Он принимает datetime объект. например в течение 2 минут мы напишем timedelta (минуты = 2)

Планировщик воздушного потока работает по принципу Cron based job исполнения. Ниже представлена ​​презентация cron.

каждую 5-ю минуту — › */5 * * * *

каждый час на 30-й минуте, например. в 10:30, 11:30 и так далее. — › 0,5,10 * * * *

Если вы раньше не работали с планировщиком заданий cron на базе Unix, то вам сложно понять, как именно его написать (это также сложно для опытных разработчиков).

Проверьте этот веб-сайт, чтобы сгенерировать выражение cron — cron-expression-generator

Исполнители

Это помогает запустить экземпляр задачи (экземпляры задачи — это функции, которые мы обернули оператором)

Типы исполнителей

Есть два типа исполнителей

Местные исполнители

  • Debug Executor — DebugExecutor является инструментом отладки и может использоваться из IDE. Это единственный исполнитель процесса, который ставит задачи в очередь и выполняет их.
  • Sequential Executor — исполнитель по умолчанию и запускается в планировщике. Кроме того, он выполняет по одному экземпляру задачи за раз, что в конечном итоге делает его неподходящим кандидатом для производства.
  • Локальный исполнитель — Запуск в планировщике и выполнение нескольких экземпляров задач одновременно. Опять же, не лучший кандидат для производства, поскольку он не масштабируется.

Удаленные исполнители

  • Celery Executor — Запуск задач на выделенных машинах (воркерах). Он использует распределенную очередь задач для распределения нагрузки между разными рабочими для распараллеливания работы. Он горизонтально масштабируется, что делает его отказоустойчивым и хорошим кандидатом для производства.
  • Kubernetes Executor — Запускайте задачи в выделенном POD (рабочем), и API Kubernetes привыкают к управлению POD. Он эффективно масштабируется и является идеальным кандидатом для производства.
  • LocalKubernetes Executor — Локальный исполнитель kubernetes.
  • CeleryKubernetes Executor — позволяет пользователям одновременно запускать CeleryExecutor и KubernetesExecutor. Исполнитель выбирается для запуска задачи на основе очереди задачи. Выбор этого исполнителя нужен только в некоторых случаях.
  • Dask Executor — кластеры Dask можно запускать на одной машине или в удаленных сетях.

Крючки

Высокоуровневый интерфейс для установления соединения с базами данных или другими внешними службами.

Список различных доступных хуков

Что, если то, что меня интересует, отсутствует ни в одном из модулей?

Не нашли подходящего оператора, исполнителей, датчики или крючки? Не беспокойтесь, вы можете написать свой собственный материал. Airflow предоставляет базовые классы, которые мы можем наследовать для написания наших пользовательских классов.

from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base_hook import BaseHook
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomOperator).__init__(**kwargs)
        pass
    def execute(self, conext): # we will cover more about context in next part.
        #your logic
        pass

class MyCustomSensor(BaseSensorOperator):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomSensor).__init__(**kwargs)
        pass
    def poke(self, context): 
        #your logic
        pass
class MyCustomHook(BaseHook):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomHook).__init__(**kwargs)
        pass
    def get_connection(self):
        #your logic
        pass

Управление зависимостями задач

Большую часть времени в рабочем процессе для выполнения задачи нам нужно проверить предварительные условия для выполнения этой задачи. Это называется зависимости. Есть разные типы

Зависимости

1- Линейные зависимости — каждая задача должна быть завершена до выполнения следующей. Например,

Представление воздушного потока

first_task ›› second_task ›› Third_task

first_task должен быть выполнен перед второй и третьей задачами. А вторая_задача должна быть выполнена до третьей_задачи.

2- Зависимости разветвления и разветвления — в этой зависимости задача ожидает завершения нескольких задач. Например,

Представление воздушного потока

first_task ›› second_task ›› Third_task ››end_task

дополнительная_зависимая_задача ›› третья_задача

Ветвление

Ветвление помогает принимать решения о том, какую задачу или задачу выполнять.

1- Ветвление внутри задачи

Программно обрабатывает выполнение задачи

def _decide_task(**context):    
    if context["execution_date"] < ROLLOUT_DATE:
        old_task(**context)
    else
        new_task(**context)
    
...
task_branching = PythonOperator(
    task_id="task_branching",
    python_callable=_decide_task,
)

Минусы —трудно определить с помощью визуализации DAG, какой путь был выполнен и почему

2- Ветвление в группе обеспечения доступности баз данных

Dummy Operator помогает добиться ветвления внутри DAGS.

Давайте создадим ниже DAG

Итак, выше мы запускаем нашу работу ежедневно, и есть два набора данных (старый и новый), которые необходимо объединить, чтобы ссылаться на данные, чтобы предоставить входные данные для задачи combine_task. Итак, здесь три набора данных объединяются для предоставления данных.

Таким образом, join_data начнет выполняться, как только все его родители завершат выполнение без каких-либо сбоев, что позволит продолжить выполнение после перехода.

Однако, чтобы сделать его немного лучше, мы можем добавить фиктивную задачу, чтобы отметить завершение объединения старых и новых наборов данных. Лучше позаботиться о том, чтобы старые и новые данные наверняка были доступны для следующего прогона.

например,

from airflow.operators.dummy import DummyOperator
 
join_branch = DummyOperator(
    task_id="join_old_and_new_data",
    trigger_rule="none_failed"
)

Наш рабочий процесс будет выглядеть так

Теперь добавление дополнительной задачи для старого и нового наборов данных может помочь нам сделать его более четким, а также включить контроль, чтобы убедиться, что данные доступны для следующего запуска.

Триггерное правило

Все операторы воздушного потока предоставляют аргумент trigger_rule, который определяет запуск этой задачи, когда все задачи, непосредственно восходящие по течению, выполнены успешно.

  • Все существующие варианты
  • all_success: (по умолчанию) все родители успешны
  • all_failed: все родители находятся в состоянии сбоя или upstream_failed.
  • all_done: выполнение всех родителей завершено.
  • one_failed: срабатывает, как только хотя бы один родитель вышел из строя, он не ждет, пока все родители будут выполнены.
  • one_success: срабатывает, как только хотя бы один из родителей завершается успешно, он не ждет завершения всех родителей.
  • none_failed: все родители не завершились ошибкой (failed или upstream_failed), т. е. все родители завершились успешно или были пропущены
  • none_skipped: ни один из родителей не находится в состоянии пропуска, т. е. все родители находятся в состоянии успеха, отказа или upstream_failed.
  • пустышка: зависимости только для галочки, срабатывают по желанию

Кросс-коммуникация, также известная как XCOM

XCOM используется для обмена данными между задачами.

xcom_push – регистрация данных в базе данных метаданных Airflow.

xcom_pull – использовать зарегистрированные данные из базы данных метаданных Airflow.

пример —обратите внимание на использование экземпляра задачи (ti) в шаблоне jinja для отправки и получения переменной.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}
dag = DAG('xcom_example', 
          schedule_interval=None,
         default_args=args)
task_1 = BashOperator(task_id="task_1",
                    bash_command='echo "{{ ti.xcom_push(key="xcom-key", value="xcom-data-to-share") }}"',
                    dag=dag)
task_2  = BashOperator(task_id="task_2",
                       bash_command='echo "{{ ti.xcom_pull(key="xcom-key") }}"',
                       dag=dag)
task_1 >> task_2

Ограничения

  • Не используйте его для обмена БОЛЬШИМИ ДАННЫМИ (помните, что Airflow — это оркестратор, а не вычислительная среда).
  • Любое значение, хранимое XCom, должно поддерживать сериализацию.

Доступ к базе данных метаданных

Базу метаданных Airflow можно использовать для хранения конфигураций DAG, таблиц, констант и идентификаторов. Он использует пару ключ-значение для поддержки этих переменных. Конфигурация магазинов (переменные) доступна по Variable.

# Suppose we have kept dag_config in metadata database in this format
# dag_config = {"key1":"value1", "key2":["a", "b", "c"]}
# then below is the way to retrive them in the dag
get_dag_config = Variable.get("dag_config", deserialize_json=True)
config1 = get_dag_config["key1"]
config2 = get_dag_config["key2"]

# If variable is simply saved in this format key1 = value1, then we use 
get_key1 = Variable.get("key1")
# Similarly we can set the variables in metadata database.
Variable.set("my_key", "my_value")

Доступ через командную строку

#import variable json file
docker-compose run --rm webserver airflow variables --import /usr/local/airflow/dags/config/dag_config.json
# get value of key1
docker-compose run --rm webserver airflow variables --get key1

Тестирование

Для воздушного потока, как правило, могут быть выполнены следующие тесты.

  • Проверочный тест DAG — чтобы проверить, является ли DAG действительным и ациклическим.
  • Модульное тестирование — для тестирования функций Python, операторов и т. д.
  • Тест интеграции – чтобы проверить, могут ли задачи рабочего процесса соединяться друг с другом.
  • Проверка рабочего процесса — для проверки всего конвейера.
  • Тест качества данных

Проверьте эти блоги для получения глубоких знаний.

Лучшие практики

  • Напишите чистую DAG и придерживайтесь одного из способов создания DAG (с диспетчером контекста или без диспетчера контекста).
  • При написании имени задачи придерживайтесь лучшего соглашения об именах. Будьте откровенны и логичны.
  • Держите код вычислений (SQL, скрипт, код Python и т. д.) и определение DAG отдельно. Каждый раз, когда DAG загружается, он пересчитывает данные, поэтому для загрузки требуется больше времени.
  • Не запрограммируйте в коде постоянное значение или какую-либо конфиденциальную информацию о подключении. Управляйте им в файле конфигурации или на центральном уровне безопасным способом.
  • Создайте тег и используйте его для быстрого просмотра, чтобы сгруппировать задачи в мониторинге.
  • Всегда ищите существующие встроенные операторы воздушного потока, крючки или датчики, прежде чем создавать свои собственные вещи.
  • XCOM не предназначен для передачи больших объемов данных.
  • Качество данных и тестирование часто упускаются из виду. Поэтому убедитесь, что вы используете стандарт для своей кодовой базы.
  • Следуйте стратегиям загрузки — инкрементным типам scd в своем коде, чтобы избежать ненужной загрузки данных.
  • Если возможно, создайте структуру для поколений DAG. Мета-обертка. Ознакомьтесь с этим репозиторием.
  • Укажите детали конфигурации один раз — место, где находятся шаблоны SQL, настраивается как переменная воздушного потока и просматривается как глобальный параметр при создании экземпляра DAG.
  • Объедините свои ресурсы в пул: все экземпляры задач в группе обеспечения доступности баз данных используют объединенное в пул соединение с DWH, указав параметр пула.
  • Управляйте данными для входа в одном месте — настройки подключения сохраняются в меню администратора.
  • Знайте, когда начинать задачу. Обычный планировщик или триггер события?

Куда пойти отсюда?

Неотъемлемой частью обучения чему-либо является создание проекта. Когда мы что-то строим, мы с трудом понимаем принцип работы и терминологию. Мы коснулись почти всего, что нам нужно для работы над любым проектом Airflow. Тем не менее, вам может понадобиться вернуться сюда снова, чтобы освежить теорию или погуглить некоторые дополнительные темы, как только мы начнем работать над проектом (например, динамический dag, создание задач — фабрика dag, развертывание, мониторинг, управление dag, проверки качества данных и т. д.). модульные тесты и др.). Дайте мне знать, если вы хотите увидеть более подробную информацию по этим темам.

Но пока мы собираемся идти. Итак, давайте создадим проект и воплотим наши знания в жизнь 💪

Ознакомьтесь с репозиториями GitHub ниже для блога и практического проекта.

Проект — Воздушный поток — Анализ данных исследования Чапел-Хилл

блог — Airflow-Заметки

Ссылка

Сообщество airflow очень активно, и многие участники по всему миру дополняют и создают несколько пакетов и утилит, чтобы облегчить жизнь разработчикам. Всегда полезно следить за сообществом и людьми в Twitter и GitHub, чтобы быть в курсе новых выпусков. Большая часть деталей в этих заметках взята из приведенных ниже ссылок. Проверьте это для получения дополнительной информации.

Спасибо, что прочитали💕!