потоковая запись в gcs с использованием Apache Beam для каждого элемента

Текущий конвейер луча читает файлы как поток, используя FileIO.matchAll().continuously(). Это возвращает PCollection. Я хочу записать эти файлы с теми же именами в другое ведро gcs, т.е. каждый PCollection - это один файл metadata/readableFile, который должен быть записан обратно в другое ведро после некоторой обработки. Есть ли какой-нибудь приемник, который я должен использовать для записи каждого PCollection элемента обратно в GCS, или есть какие-то способы сделать это? Можно ли создать окно для каждого элемента, а затем использовать некоторый ввод-вывод приемника GCS, чтобы иметь возможность это сделать. При работе с окном (даже если оно имеет несколько элементов) гарантирует ли луч, что окно либо полностью обработано, либо не обработано вообще, другими словами, это операции записи в GCS or bigquery для данного окна, атомарного, а не частичного в случае любого неудачи?


comment
Поправьте меня, если я ошибаюсь, но похоже, что вам просто нужно скопировать файлы из одного места в другое. Вам действительно нужен Dataflow для этого? Я полагаю, вы могли бы использовать некоторую комбинацию gsutil, bq и облачных функций Google для достижения всего этого.   -  person Andrew Nguonly    schedule 13.01.2018
comment
да, почти так, мне нужно пройти через все события, хранящиеся в этих файлах, и отфильтровать некоторые, изменить некоторые, агрегировать некоторые, но на уровне файла, потому что это дает мне идемпотентность, так что если я повторно обработаю один и тот же файл, я получу те же данные . Но если есть способы добиться этого, сглаживая файлы в потоковые оконные события, а затем используя обычный textio / avroio, я хотел бы также знать это решение.   -  person user179156    schedule 13.01.2018
comment
Спасибо за разъяснения. Я не могу придумать никаких способов добиться этого. Требование идемпотентности файла может быть слишком сложным для реализации в Dataflow. Наивным решением было бы обрабатывать 1 файл на каждое задание Dataflow, что, на мой взгляд, небезосновательно.   -  person Andrew Nguonly    schedule 13.01.2018
comment
ох, это определенно неразумно. каждый день генерируются миллионы файлов, мы определенно не можем запустить миллионную задачу потока данных только для обработки нескольких мегабайт в 1 файле, где каждый файл представляет собой всего несколько тысяч событий. Запуск заданий потока данных для каждого файла - это вообще не решение. не только поток данных, запуск любого двоичного файла для каждого файла для обработки одного файла для каждого экземпляра не является решением. Это похоже на то, что у меня будет один сервер для одного запроса.   -  person user179156    schedule 14.01.2018


Ответы (1)


Можете ли вы просто написать DoFn<ReadableFile, Void>, который берет файл и копирует его в нужное место с помощью FileSystems API? Для этого вам не нужна никакая "раковина" - и, в любом случае, это то, что все "раковины" (TextIO.write(), AvroIO.write() и т. Д.) В любом случае находятся под капотом: это просто преобразования луча, состоящие из ParDo и GroupByKey.

person jkff    schedule 15.01.2018
comment
это то, что я делаю прямо сейчас (и предыдущие конвейеры потока данных). Интересно, были ли какие-либо оговорки или последствия для производительности, потому что я могу открывать слишком много подключений к gcs от каждого рабочего, выполняющего DoFn для каждого элемента. Я видел, что beam api обеспечивает динамическое назначение, поэтому подумал, что могут быть более идиоматические способы обращения с этим или каким-либо другим приемником. - person user179156; 15.01.2018
comment
Также есть способ создать windo для каждого элемента, а не запускать вывод для каждого окна? Также можно ответить о частичной обработке окна? - person user179156; 15.01.2018
comment
ошибка зависшего задания: сеть, выбранная для этого задания, должна иметь правила, которые открывают TCP-порты 1-65535 для внутреннего соединения с другими виртуальными машинами. Без этого любой конвейер с более чем одним воркером, который перемешивает данные, будет зависать. Сеть "по умолчанию" изначально настроена правильно. Рассмотрите возможность его использования и / или убедитесь, что он не был изменен. Причины: правила брандмауэра, связанные с вашей сетью, не открывают TCP-порты 1-65535 для экземпляров Dataflow. Если правило брандмауэра открывает соединение в этих портах, убедитесь, что целевые теги не указаны или что правило включает тег «поток данных». - person user179156; 15.01.2018
comment
Я не могу придумать никаких предостережений. Если вам нужно создать N файлов, нет способа сделать это, не выполнив N вызовов загрузки GCS - это то, что будет делать ваш текущий код, и это то, что будет делать DynamicDestinations API (или лучше, FileIO.write ()). . DD API здесь определенно будет излишним. Я не уверен, что понимаю вопрос об окне на элемент или об атомарности: актуальны ли эти вопросы в свете вышеизложенного? - person jkff; 15.01.2018