Компонентный шлюз с DataprocOperator на Airflow

В GCP довольно просто установить и запустить компонент JupyterHub из UI или команда gcloud. Я пытаюсь написать сценарий процесса через Airflow и DataprocClusterCreateOperator, здесь отрывок из DAG

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )

Однако мне не удалось указать нужный параметр enable-component-gateway. Глядя на исходный код, кажется, что параметры не предназначены (как в устарел или последний стабильный оператор).

Я знаю, что REST API предоставляет endpointConfig.enableHttpPortAccess, но я бы предпочел использовать официальный оператор. Кто-нибудь знает, как этого добиться?


person kwn    schedule 02.01.2020    source источник


Ответы (1)


Edit, исправление для composer-1.8.3 с airflow-1.10.3

В Airflow 1.10.3 конфигурация кластера не может быть создана извне. Однако мы можем унаследовать оператор создания кластера и переопределить создание конфигурации. Это также позволит нам установить дополнительные компоненты, параметр, отсутствующий в этой версии Airflow.

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    num_masters=1,
    master_machine_type='n1-standard-2',
    worker_machine_type='n1-standard-2',
    master_disk_size=100,
    worker_disk_size=100,
    storage_bucket='test-dataproc-jupyter', 
    region='europe-west4', 
    zone='europe-west4-a',
    auto_delete_ttl=21600, 
    dag=dag
)

Исходный ответ для Airflow 1.10.7

Хотя это и не оптимально, вы можете создать структуру данных Cluster самостоятельно, вместо того, чтобы использовать для этого ClusterGenerator Airflow. Он должен работать на последней версии (1.10.7)

cluster = {
  'clusterName': CLUSTER_NAME,
  'config': {
    'gceClusterConfig': {
      'zoneUri': 'europe-west4-a'
    },
    'masterConfig': {
      'numInstances': 1,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'workerConfig': {
      'numInstances': 3,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'softwareConfig': {
      'optionalComponents': [
        'ANACONDA',
        'JUPYTER'
      ]
    },
    'lifestyleConfig': {
      'autoDeleteTtl': 21600
    },
    'endpointConfig': {
      'enableHttpPortAccess': True
    }
  },
  'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    region='europe-west4', 
    zone='europe-west4-a',
    cluster = cluster,
    dag=DAG
)

Если вы используете другую версию Airflow, укажите это.

Вы также можете проголосовать за обнаруженную мной ошибку: AIRFLOW-6432

person David Rabinowitz    schedule 03.01.2020
comment
Я использую composer-1.8.3 с airflow-1.10.3, кажется, я не могу выполнить обновление до 1.10.7 (пока?), Я также использую airflow.contrib.operators.dataproc_operator, так как не могу получить доступ к другому оператору. Кластер создается с вашей работой, но кажется, что endpointConfig.enableHttpPortAccess не учитывается (я вижу в пользовательском интерфейсе, что шлюз компонентов все еще отключен) - person kwn; 03.01.2020
comment
Привет @kwn, пожалуйста, посмотрите исправленный ответ - person David Rabinowitz; 03.01.2020
comment
Спасибо, Дэвид, это работает как шарм! Есть ли способ получить результирующий URL-адрес Jupyter? (Как установить фикс?) - person kwn; 03.01.2020
comment
переход в консоль GCP (console.cloud.google.com/dataproc/clusters) , найдите свой кластер, а затем вы можете найти ссылку на jupyter на вкладке веб-интерфейсов. Обратите внимание, что в какой-то момент кластеры могут выйти из строя и что у нового может быть новый URL. Возможно, вы можете привязать облачную функцию к конечной точке http, которая найдет кластер по его имени, возьмет URL-адрес jupyter из его метаданных (используйте dataproc.get_cluster), а затем перенаправит на этот URL-адрес. - person David Rabinowitz; 03.01.2020
comment
Большое спасибо! Я знал о вкладке веб-интерфейсов, я ищу более автоматический способ, поэтому я углублюсь в ваш второй пункт. - person kwn; 06.01.2020