Как абстрагироваться от сложности конвейера данных с помощью декораторов Airflow

Представьте себе сценарий, в котором вам нужно выполнять несколько ежедневных заданий для извлечения данных из озера данных, их предварительной обработки и сохранения очищенных наборов данных в выделенной базе данных. Было бы чрезвычайно утомительно, если бы нам приходилось запускать конвейер каждый день, постоянно проверяя возможные ошибки. Здесь вам пригодится Airflow: он предоставляет вам все инструменты для автоматического создания и мониторинга нескольких конвейеров данных.

Эта статья является частью серии, посвященной Airflow, которую я начал недавно:

1. Airflow for Data Pipeline 101
2. Airflow: Decorators for a Clean Data Pipeline
3. Unit Testing Your Airflow Data Pipeline
4. TBD...

Здесь я расскажу о декораторах и о том, как их можно использовать в Airflow, чтобы абстрагироваться от многих сложностей Airflow при построении конвейера данных.

Table of Content:
1. Airflow 101
2. Defining an Extract-Transform-Load (ETL) data pipeline
3. Airflow before decorators
4. Airflow after decorators

Воздушный поток 101

Прежде чем мы начнем, убедитесь, что на вашем компьютере правильно настроен Airflow и что вы имеете общее представление о том, как работает Airflow. Следующий пост проведет вас через процесс установки и настройки.



После правильной настройки вы сможете просматривать веб-интерфейс Airflow, как показано ниже. На странице будут выделены все ваши конвейеры, кому они принадлежат, расписания и соответствующая диагностика для мониторинга состояния ваших систем.

Определение конвейера данных ETL

Извлечение-преобразование-загрузка (ETL) - это конвейер приема данных, который объединяет данные из нескольких источников, применяет преобразование и загружает их в согласованное хранилище данных или базу данных.

В этом руководстве мы собираемся определить простой конвейер ETL следующим образом:

  1. Извлеките файл JSON,
  2. Преобразуйте данные и верните общее количество строк,
  3. Загрузить преобразованные данные и распечатать их (а не хранить в базе данных / хранилище).

Воздушный поток перед декораторами

В традиционном конвейере Airflow описанные выше процессы будут выглядеть примерно так:

Первый шаг: импорт библиотек

import datetime as dt
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

Второй шаг: установите настройки по умолчанию

default_args = {
    'owner': 'me'
}

Третий шаг: определение задачи направленного ациклического графа (DAG)

Что такое DAG?

  • Направленное - это упорядоченная задача, которая выполняется в заранее определенной последовательности.
  • Ациклический означает завершающуюся задачу без возможности возникновения вечного цикла.
  • График означает структуру, которая может установить связь "многие ко многим задачам".
with DAG('airflow_tutorial_v01',
         default_args=default_args,
         schedule_interval='0 * * * *',
         ) as dag:

    extract = PythonOperator(task_id='extract',
                             python_callable=extract)
    
    transform = PythonOperator(task_id='transform',
                               python_callable=transform)
    
    load = PythonOperator(task_id='load',
                          python_callable=load)

Четвертый шаг: определение функций ETL

для функции extract ()

def extract():
    order_data_dict = json.loads(data_string)
    
    return order_data_dict

для функции transform () мы имеем

def transform(order_data_dict: dict):
    return {"total_count": len(order_data_dict)}

для функции load () имеем

def load(total_count: int):
    print(f"Total order value is: {total_count}")

Шаг пятый: подключите конвейер

Вне функции DAG мы можем подключить конвейер следующим образом:

extract >> transform >> load

Что ж, эти шаги выглядят нормально, но они могут быть утомительными, если наш конвейер становится сложным. Один из ключевых методов абстракции, который предоставляет Airflow, - это декораторы. Итак, теперь давайте посмотрим, как можно упростить описанные выше шаги с помощью декораторов.

Воздушный поток после декораторов

Теперь давайте реорганизуем вышеуказанный конвейер с помощью декораторов.

Что такое декораторы в Python? Проще говоря, декораторы позволяют нам оборачивать функцию и расширять ее функциональность. Дополнительную информацию вы можете найти по этой ссылке.

Первый шаг: импорт библиотек

import json

from airflow.decorators import dag, task

Второй шаг: установка настройки по умолчанию

default_args = {
    'owner': 'me'
}

Третий шаг: определение DAG и функций

@dag(default_args=default_args, tags=['etl'])
def etl_pipeline():
    
    @task()
    def extract():
        return json.loads(data_string)
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_count": len(order_data_dict)}
    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_count}")
    extracted = extract()
    transformed = transform(extracted)
    load(transformed["total_count"])

Шаг четвертый: подключите конвейер

etl_dag = etl_pipeline()

Поэтому вместо того, чтобы определять отдельную функцию для DAG и различных шагов ETL, мы размещаем все в единой последовательности с помощью декораторов. Мало того, код выглядит более функциональным и естественным для инженеров.

Заключение

Мы успешно абстрагировались от сложности построения конвейера данных в Airflow с использованием декораторов. Декораторы позволяют писать функции конвейера данных последовательным и управляемым образом. Мало того, декораторы также позволяют писать меньше кода при том же количестве функциональных возможностей! Как всегда, если вам понравится этот пост, я напишу больше статей, в которых подробно рассматриваются более сложные концепции Airflow.

Подпишитесь на мою информационную рассылку по электронной почте: https://tinyurl.com/2npw2fnz, где я регулярно резюмирую советы по программированию и исследовательские статьи по ИИ на простом английском языке и с красивой визуализацией.