Доступ к файлу внутри конвейера потока данных

Я хочу загрузить определенные файлы из временного местоположения до запуска конвейера. Файлы .mmdb, которые должны быть прочитаны в функции ParDo. Файлы хранятся в Google Storage, но метод, использующий файлы .mmdb, требует, чтобы они были объект File (java.io).

Если я включу его в --filesToStage, они будут доступны как InputStream внутри zip. Я хочу получить к ним доступ как к файлам, а не к InputStream. Как лучше всего этого добиться?

В настоящее время я загружаю файлы во временную папку на рабочем месте внутри установки ParDo.


person user3777228    schedule 02.01.2019    source источник


Ответы (2)


Это очень широкий вопрос и вопрос высокого уровня. Ответ зависит от вашей логики, потребляющей файлы. File представляет собой файл в файловой системе, поэтому если у вас есть компонент, который требует, чтобы входные данные были экземпляром File, то будет правильным записать его во временную папку локально. Луч не обеспечивает лучшей абстракции для этого случая.

Однако я бы порекомендовал изучить обновление логики, которая в настоящее время обрабатывает Files, чтобы принимать и другие виды ввода. Вы, вероятно, столкнетесь с проблемой, вызванной отсутствием разделения проблем и тесной связью. То есть у вас есть компонент, который принимает File, открывает его, обрабатывает ошибки при его открытии, читает его, анализирует данные из него, возможно, даже проверяет и обрабатывает данные. Все это отдельные проблемы и, вероятно, должны решаться отдельными компонентами, которые вы можете комбинировать и заменять вместе, когда это необходимо, например:

  • класс, который знает, как работать с файловой системой и превращать путь в поток байтов;
  • аналогичный класс, который знает, как получить файл через http (например, вариант использования GCS) и превратить его в поток байтов;
  • компонент, который знает, как преобразовать поток байтов в данные;
  • компонент, обрабатывающий проанализированные данные;
  • другие вещи, вероятно, могут жить где угодно;

Таким образом, вы можете легко реализовать любые другие источники для вашего компонента, составить и протестировать их независимо.

Например, вы можете реализовать свою логику как 2 соединенных PCollections, один из которых будет читать напрямую из местоположения GCS, анализировать текстовые строки и обрабатывать его в реальной бизнес-логике перед объединением с другим PCollection.

person Anton    schedule 02.01.2019
comment
В идеале этот файл должен быть прочитан один раз для каждого рабочего. Вход в конвейер - pub / sub. - person user3777228; 04.01.2019
comment
Меня немного смущает ваш вопрос, но видели ли вы это - person WIT; 13.01.2019

Думаю, я понимаю, что вы / пытались сделать, и я хотел сделать то же самое.

Это сработало для меня (в методе setup () DoFn):

 if(not FileSystems.exists(local_db_location) ):
        with FileSystems.open(  self._cloud_database_loc ) as af:
            with FileSystems.create(local_db_location) as local_file:
                try:
                    shutil.copyfileobj(af,local_file,length=131072)
                except:
                    raise
    else:
        #DB exists
person Rob Knights    schedule 26.02.2020