Текущий конвейер луча читает файлы как поток, используя FileIO.matchAll().continuously()
. Это возвращает PCollection. Я хочу записать эти файлы с теми же именами в другое ведро gcs, т.е. каждый PCollection
- это один файл metadata/readableFile
, который должен быть записан обратно в другое ведро после некоторой обработки. Есть ли какой-нибудь приемник, который я должен использовать для записи каждого PCollection
элемента обратно в GCS, или есть какие-то способы сделать это? Можно ли создать окно для каждого элемента, а затем использовать некоторый ввод-вывод приемника GCS, чтобы иметь возможность это сделать. При работе с окном (даже если оно имеет несколько элементов) гарантирует ли луч, что окно либо полностью обработано, либо не обработано вообще, другими словами, это операции записи в GCS or bigquery
для данного окна, атомарного, а не частичного в случае любого неудачи?
потоковая запись в gcs с использованием Apache Beam для каждого элемента
Ответы (1)
Можете ли вы просто написать DoFn<ReadableFile, Void>
, который берет файл и копирует его в нужное место с помощью FileSystems
API? Для этого вам не нужна никакая "раковина" - и, в любом случае, это то, что все "раковины" (TextIO.write()
, AvroIO.write()
и т. Д.) В любом случае находятся под капотом: это просто преобразования луча, состоящие из ParDo
и GroupByKey
.
person
jkff
schedule
15.01.2018
это то, что я делаю прямо сейчас (и предыдущие конвейеры потока данных). Интересно, были ли какие-либо оговорки или последствия для производительности, потому что я могу открывать слишком много подключений к gcs от каждого рабочего, выполняющего DoFn для каждого элемента. Я видел, что beam api обеспечивает динамическое назначение, поэтому подумал, что могут быть более идиоматические способы обращения с этим или каким-либо другим приемником.
- person user179156; 15.01.2018
Также есть способ создать windo для каждого элемента, а не запускать вывод для каждого окна? Также можно ответить о частичной обработке окна?
- person user179156; 15.01.2018
ошибка зависшего задания: сеть, выбранная для этого задания, должна иметь правила, которые открывают TCP-порты 1-65535 для внутреннего соединения с другими виртуальными машинами. Без этого любой конвейер с более чем одним воркером, который перемешивает данные, будет зависать. Сеть "по умолчанию" изначально настроена правильно. Рассмотрите возможность его использования и / или убедитесь, что он не был изменен. Причины: правила брандмауэра, связанные с вашей сетью, не открывают TCP-порты 1-65535 для экземпляров Dataflow. Если правило брандмауэра открывает соединение в этих портах, убедитесь, что целевые теги не указаны или что правило включает тег «поток данных».
- person user179156; 15.01.2018
Я не могу придумать никаких предостережений. Если вам нужно создать N файлов, нет способа сделать это, не выполнив N вызовов загрузки GCS - это то, что будет делать ваш текущий код, и это то, что будет делать DynamicDestinations API (или лучше, FileIO.write ()). . DD API здесь определенно будет излишним. Я не уверен, что понимаю вопрос об окне на элемент или об атомарности: актуальны ли эти вопросы в свете вышеизложенного?
- person jkff; 15.01.2018
gsutil
,bq
и облачных функций Google для достижения всего этого. - person Andrew Nguonly   schedule 13.01.2018