как установить dask на google composer

Пытался установить dask на google composer (airflow). Я использовал pypi (GCP UI), чтобы добавить dask и необходимые ниже пакеты (не уверен, все ли требуются Google, но не смог найти require.txt):

 dask
 toolz
 partd
 cloudpickle
 google-cloud
 google-cloud-storage
 google-auth
 google-auth-oauthlib
 decorator

когда я запускаю свой DAG с dd.read_csv («ведро gcp»), он показывает следующую ошибку в журнале воздушного потока:

    [2018-10-24 22:25:12,729] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 350, in get_fs_token_paths
    [2018-10-24 22:25:12,733] {base_task_runner.py:98} INFO - Subtask:     fs, fs_token = get_fs(protocol, options)
    [2018-10-24 22:25:12,735] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 473, in get_fs
    [2018-10-24 22:25:12,740] {base_task_runner.py:98} INFO - Subtask:     "Need to install `gcsfs` library for Google Cloud Storage support\n"
    [2018-10-24 22:25:12,741] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/utils.py", line 94, in import_required
    [2018-10-24 22:25:12,748] {base_task_runner.py:98} INFO - Subtask:     raise RuntimeError(error_msg)
    [2018-10-24 22:25:12,751] {base_task_runner.py:98} INFO - Subtask: RuntimeError: Need to install `gcsfs` library for Google Cloud Storage support
    [2018-10-24 22:25:12,756] {base_task_runner.py:98} INFO - Subtask:     conda install gcsfs -c conda-forge
    [2018-10-24 22:25:12,758] {base_task_runner.py:98} INFO - Subtask:     or
    [2018-10-24 22:25:12,762] {base_task_runner.py:98} INFO - Subtask:     pip install gcsfs

поэтому я попытался установить gcsfs с помощью pypi, но получил следующую ошибку воздушного потока:

{
  insertId:  "17ks763f726w1i"  
  logName:  "projects/xxxxxxxxx/logs/airflow-worker"  
  receiveTimestamp:  "2018-10-25T15:42:24.935880717Z"  
  resource: {…}  
  severity:  "ERROR"  
  textPayload:  "Traceback (most recent call last):
  File "/usr/local/bin/gcsfuse", line 7, in <module>
   from gcsfs.cli.gcsfuse import main
  File "/usr/local/lib/python2.7/site- 
    packages/gcsfs/cli/gcsfuse.py", line 3, in <module>
     fuse import FUSE
    ImportError: No module named fuse
 "  
  timestamp:  "2018-10-25T15:41:53Z"  
}

кажется, что он застрял в цикле необходимых пакетов !! не уверен, что я что-то здесь пропустил? Есть предположения?


person MT467    schedule 01.11.2018    source источник
comment
??? почему отрицательная точка ??   -  person MT467    schedule 01.11.2018
comment
Это кажется знакомым .... какая команда приводит к отображаемой ошибке?   -  person mdurant    schedule 01.11.2018
comment
Для справки, новая среда с py2 или 3, pip install gcsfs работает нормально, без необходимости сначала явно устанавливать требования.   -  person mdurant    schedule 01.11.2018
comment
@mdurant Я разместил более общий вопрос, связанный с dask. как использовать pip в google composer? Я не хочу напрямую устанавливать его на виртуальную машину google composer, хотя   -  person MT467    schedule 02.11.2018
comment
Верно, но мы не сможем вам помочь, если не знаем, какая команда вызывает ошибку, и поэтому не можем воспроизвести ее сами.   -  person mdurant    schedule 02.11.2018
comment
@mdurant Я использовал google composer ui (pypi) для установки gcsfs. google composer coudlnt установил его и выдал ошибку! так просто, как кажется, но не работает для меня. Я использую composer-1.0.0-airflow-1.9.0.   -  person MT467    schedule 05.11.2018
comment
мы не знаем, какая команда вызывает ошибку - все еще не знаем. pip install gcsfs не ведет к ImportError: No module named fuse.   -  person mdurant    schedule 05.11.2018


Ответы (1)


Вам не нужно добавлять хранилище в свои пакеты PyPi, он уже установлен. Я запустил dag (image-version: composer-1.3.0-airflow-1.10.0), регистрируя версию предустановленного пакета, и оказалось, что это 1.13.0. Я также добавил в свой даг следующее, чтобы воспроизвести ваш случай:

import dask.dataframe as dd
def read_csv_dask():
    df = dd.read_csv('gs://gcs_path/data.csv')
    logging.info("csv from gs://gcs_path/ read alright")

Прежде всего я добавил через пользовательский интерфейс следующие зависимости:

dask==0.20.0
toolz==0.9.0
partd==0.3.9
cloudpickle==0.6.1

Соответствующая задача не удалась с тем же сообщением, что и ваша («Необходимо установить gcsfs библиотеку для поддержки Google Cloud Storage»), после чего я вернулся в пользовательский интерфейс и попытался добавить gcsfs==0.1.2. Это так и не удалось. Однако я не получил ту ошибку, которую вы сделали, вместо этого я неоднократно терпел неудачу с "Тайм-аут Composer Backend".

На этом этапе вы можете рассмотреть следующие альтернативы:

1) Установите gcsfs с помощью pip в BashOperator. Это не оптимально, так как вы будете устанавливать gcsfs каждый раз при запуске dag.

2) Воспользуйтесь другой библиотекой. Что ты делаешь с этим CSV? Если вы загрузите его в каталог gs://composer_gcs_bucket/data/ (проверьте здесь) затем вы можете прочитать его, используя, например, стандартная библиотека csv, например:

import csv
def read_csv():
    f=open('/home/airflow/gcs/data/data.csv', 'rU')
    reader = csv.reader(f)
person Lefteris S    schedule 07.11.2018