Как абстрагироваться от сложности конвейера данных с помощью декораторов Airflow
Представьте себе сценарий, в котором вам нужно выполнять несколько ежедневных заданий для извлечения данных из озера данных, их предварительной обработки и сохранения очищенных наборов данных в выделенной базе данных. Было бы чрезвычайно утомительно, если бы нам приходилось запускать конвейер каждый день, постоянно проверяя возможные ошибки. Здесь вам пригодится Airflow: он предоставляет вам все инструменты для автоматического создания и мониторинга нескольких конвейеров данных.
Эта статья является частью серии, посвященной Airflow, которую я начал недавно:
1. Airflow for Data Pipeline 101 2. Airflow: Decorators for a Clean Data Pipeline 3. Unit Testing Your Airflow Data Pipeline 4. TBD...
Здесь я расскажу о декораторах и о том, как их можно использовать в Airflow, чтобы абстрагироваться от многих сложностей Airflow при построении конвейера данных.
Table of Content: 1. Airflow 101 2. Defining an Extract-Transform-Load (ETL) data pipeline 3. Airflow before decorators 4. Airflow after decorators
Воздушный поток 101
Прежде чем мы начнем, убедитесь, что на вашем компьютере правильно настроен Airflow и что вы имеете общее представление о том, как работает Airflow. Следующий пост проведет вас через процесс установки и настройки.
После правильной настройки вы сможете просматривать веб-интерфейс Airflow, как показано ниже. На странице будут выделены все ваши конвейеры, кому они принадлежат, расписания и соответствующая диагностика для мониторинга состояния ваших систем.
Определение конвейера данных ETL
Извлечение-преобразование-загрузка (ETL) - это конвейер приема данных, который объединяет данные из нескольких источников, применяет преобразование и загружает их в согласованное хранилище данных или базу данных.
В этом руководстве мы собираемся определить простой конвейер ETL следующим образом:
- Извлеките файл JSON,
- Преобразуйте данные и верните общее количество строк,
- Загрузить преобразованные данные и распечатать их (а не хранить в базе данных / хранилище).
Воздушный поток перед декораторами
В традиционном конвейере Airflow описанные выше процессы будут выглядеть примерно так:
Первый шаг: импорт библиотек
import datetime as dt import json
from airflow import DAG from airflow.operators.python_operator import PythonOperator
Второй шаг: установите настройки по умолчанию
default_args = { 'owner': 'me' }
Третий шаг: определение задачи направленного ациклического графа (DAG)
Что такое DAG?
- Направленное - это упорядоченная задача, которая выполняется в заранее определенной последовательности.
- Ациклический означает завершающуюся задачу без возможности возникновения вечного цикла.
- График означает структуру, которая может установить связь "многие ко многим задачам".
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0 * * * *',
) as dag:
extract = PythonOperator(task_id='extract',
python_callable=extract)
transform = PythonOperator(task_id='transform',
python_callable=transform)
load = PythonOperator(task_id='load',
python_callable=load)
Четвертый шаг: определение функций ETL
для функции extract ()
def extract(): order_data_dict = json.loads(data_string) return order_data_dict
для функции transform () мы имеем
def transform(order_data_dict: dict): return {"total_count":len(
order_data_dict)
}
для функции load () имеем
def load(total_count: int): print(f"Total order value is: {total_count}")
Шаг пятый: подключите конвейер
Вне функции DAG мы можем подключить конвейер следующим образом:
extract >> transform >> load
Что ж, эти шаги выглядят нормально, но они могут быть утомительными, если наш конвейер становится сложным. Один из ключевых методов абстракции, который предоставляет Airflow, - это декораторы. Итак, теперь давайте посмотрим, как можно упростить описанные выше шаги с помощью декораторов.
Воздушный поток после декораторов
Теперь давайте реорганизуем вышеуказанный конвейер с помощью декораторов.
Что такое декораторы в Python? Проще говоря, декораторы позволяют нам оборачивать функцию и расширять ее функциональность. Дополнительную информацию вы можете найти по этой ссылке.
Первый шаг: импорт библиотек
import json from airflow.decorators import dag, task
Второй шаг: установка настройки по умолчанию
default_args = { 'owner': 'me' }
Третий шаг: определение DAG и функций
@dag(default_args=default_args, tags=['etl']) def etl_pipeline(): @task() def extract(): return json.loads(data_string) @task(multiple_outputs=True) def transform(order_data_dict: dict): return {"total_count":len(
order_data_dict)
} @task() def load(total_order_value: float): print(f"Total order value is: {total_count}") extracted = extract() transformed = transform(extracted) load(transformed["total_count"])
Шаг четвертый: подключите конвейер
etl_dag = etl_pipeline()
Поэтому вместо того, чтобы определять отдельную функцию для DAG и различных шагов ETL, мы размещаем все в единой последовательности с помощью декораторов. Мало того, код выглядит более функциональным и естественным для инженеров.
Заключение
Мы успешно абстрагировались от сложности построения конвейера данных в Airflow с использованием декораторов. Декораторы позволяют писать функции конвейера данных последовательным и управляемым образом. Мало того, декораторы также позволяют писать меньше кода при том же количестве функциональных возможностей! Как всегда, если вам понравится этот пост, я напишу больше статей, в которых подробно рассматриваются более сложные концепции Airflow.
Подпишитесь на мою информационную рассылку по электронной почте: https://tinyurl.com/2npw2fnz, где я регулярно резюмирую советы по программированию и исследовательские статьи по ИИ на простом английском языке и с красивой визуализацией.