Data Fusion - это облачное решение Google для построения конвейеров данных без какого-либо кода, хотя решение имеет некоторые ограничения (пока) при использовании вместе с Cloud Composer становится действительно мощным инструментом для создания озер данных.

Давайте поговорим о проблемах с Data Fusion:

Поэтому для меня самым большим ограничением, которое я обнаружил с Data Fusion, является то, что вы не можете передавать динамические параметры, единственный динамический параметр, который он принимает, - это даты, которые работают с формулой ниже:

${logicalStartTime(yyyy-MM-dd)}

Это вернет текущую дату, если вы хотите что-то вроде вчерашнего дня, вы можете получить вот так:

${logicalStartTime(dd/MM/yyyy,1d)}

Но кроме этого, вы не можете передавать динамические параметры, поэтому, если вы хотите, чтобы слияние данных получило последний элемент в ведре GCS, вы не можете этого сделать.

Вторая проблема, с которой я столкнулся с Data Fusion, заключается в том, что вы всегда должны указывать результат для узла, и когда вы начинаете создавать действительно большие конвейеры, это становится очень раздражающим.

И последняя самая большая проблема заключается в том, что у вас много конвейеров, каждый конвейер будет создавать кластер dataproc, запускать и уничтожать кластер dataproc, это может добавить довольно много времени для операции, потому что на сегодняшний день каждый кластер занимает около 5 минут, чтобы Допустим, у вас есть 30 конвейеров, и вы потеряете почти 3 часа, просто повторяя этот шаг, плюс время, необходимое для фактического запуска конвейеров.

Теперь решения:

Две из вышеперечисленных проблем можно легко решить, используя Cloud Composer вместе с Cloud data Fusion, для меня окончательное решение заключалось в том, чтобы Cloud Composer передавал параметры, которые я хотел для конвейера Data Fusion динамически,

Cloud Composer может автоматически создавать, удалять, запускать, останавливать, обновлять и перечислять все конвейеры Data Fusion с помощью следующих операторов:

CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
CloudDataFusionDeleteInstanceOperator,
CloudDataFusionDeletePipelineOperator,
CloudDataFusionGetInstanceOperator,
CloudDataFusionListPipelinesOperator,
CloudDataFusionRestartInstanceOperator,
CloudDataFusionStartPipelineOperator,
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,

Как это сделать, сначала вам нужно при создании конвейера Data Fusion создать «открытые» переменные с помощью этого:

${NAME OF THE VARIABLE}

Когда вы развертываете все «открытые» переменные, они будут перечислены в качестве фрагментов среды выполнения, которые вам нужно будет заполнять перед каждым запуском:

Теперь, когда он создан, мы можем использовать Google Composer для передачи динамических аргументов в ваш конвейер, поэтому для этого случая, например, вы должны создать даг, подобный этому:

task1 = CloudDataFusionStartPipelineOperator(
task_id="task1",
location='southamerica-east1',
pipeline_name="test",
namespace='laketest',
runtime_args={'folder':'my_data.csv'},
dag=dag)

Для создания и удаления данных для каждого конвейера Google Composer также может помочь вам, вы можете настроить конвейеры Data Fusion для использования существующего dataproc вместо его создания:

В Data Fusion вам нужно будет перейти к кнопке системного администратора в левом верхнем углу экрана:

Затем вы перейдете к настройке ›› профиль системных вычислений ›› создать новый профиль ›› Существующий Dataproc:

И заполните обязательные поля имеющейся у вас информацией о dataproc:

При этом вы вернетесь к своему собственному конвейеру и измените имя системного профиля на то, которое вы создали:

Но это решение может быть слишком дорогим, так как поддержание постоянно создаваемого кластера dataproc является дорогостоящим, и когда облачный композитор снова пригодится, после того, как вы выполните описанные выше шаги, вы можете сказать композитору создать и удалить кластер, так что он вверх, только когда конвейер запущен, вы можете сделать это, добавив это в тег выше:

create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id=’create_dataproc_cluster’,
project_id=’teste’,
cluster_name=’airflow-cluster’,
num_workers=4,
subnetwork_uri=’teste-nonprod-southamerica-east1-subnet-private-1',
region=’southamerica-east1',
master_machine_type=’n1-standard-4',
worker_machine_type=’n1-standard-4',
dag=dag)

task1 = CloudDataFusionStartPipelineOperator(
task_id=”task1",
location=’southamerica-east1',
pipeline_name=”test”,
namespace=’teste’,
runtime_args={‘folder’:’my_data_01082020.csv’},
dag=dag)

delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id=’delete_dataproc_cluster’,
project_id=’teste’,
cluster_name=’airflow-cluster’,
region=’southamerica-east1',
dag=dag)

create_dataproc_cluster >> Task1 >> delete_dataproc_cluster

И для выходной схемы у меня нет умного исправления, но я узнал, что каждый раз, когда появляется нулевой указатель Java, попробуйте экспортировать файл, затем проверьте схему и так далее для каждого нового узла, который вы делаете, это может сделать развитие немного медленнее, но именно так я справился.

С помощью этого хака у вас должно быть более чем достаточно инструментов для создания собственной базы данных на GCP, так что удачи :)