Вставка в BigQuery через загрузочные задания (не потоковая передача)

Я хочу использовать Dataflow для загрузки данных в таблицы BigQuery с помощью заданий загрузки BQ - без потоковой передачи (в нашем случае потоковая передача будет стоить слишком дорого). Я вижу, что в SDK Dataflow встроена поддержка вставки данных через потоковую передачу BQ, но я не смог найти в SDK Dataflow ничего, что поддерживает задания загрузки из коробки.

Некоторые вопросы:

1) Имеется ли в SDK Dataflow поддержка OOTB для вставок заданий загрузки BigQuery? Если нет, то планируется ли это?

2) Если мне нужно свернуть свою собственную, какие есть хорошие подходы?

Если мне нужно свернуть свой собственный, выполнение задания загрузки BQ с использованием Google Cloud Storage представляет собой многоэтапный процесс: запишите файл в GCS, отправьте задание загрузки через BQ API и (необязательно) проверьте статус, пока задание не будет завершено. (или не удалось). Я бы надеялся, что смогу использовать существующую функциональность TextIO.write () для записи в GCS, но я не уверен, как бы скомпоновать этот шаг с последующим вызовом BQ API для отправки задания загрузки (и, возможно, последующие вызовы для проверки состояния задания до его завершения).

Кроме того, я бы использовал поток данных в потоковом режиме с окнами в 60 секунд, поэтому я бы также хотел выполнять задание загрузки каждые 60 секунд.

Предложения?


person Jon Chase    schedule 16.06.2015    source источник
comment
(удален ответ и преобразован в комментарий). В пакетном режиме поток данных фактически записывается в GCS, а затем запускает задание (я) массовой загрузки BigQuery для получения данных. Затем он должен удалить файлы в GCS после успешного (или неудачного) конвейера, но есть ошибка с этим (goo.gl/8rY1uk). В потоковом режиме он действительно будет использовать потоковый API. О каком размере мы здесь говорим? Потоковая передача стоит всего 0,01 доллара за 200 МБ. Может быть, вы могли бы написать 2 конвейера - один, который записывает в GCS (в потоковом режиме), а другой в пакетном режиме, собирает эти файлы и использует массовую загрузку BQ?   -  person Graham Polley    schedule 17.06.2015
comment
Интересно, что не понимал, что DF пишет в BQ по-разному в зависимости от того, является ли конвейер пакетным или потоковым. Я все еще не могу найти код пакетной загрузки BQ в DF SDK, но, может быть, он не распространяется с SDK? Что касается затрат на потоковую передачу, цена для нашего варианта использования немного высока - около 6 миллиардов строк по 1 КБ в день, что составляет около 9000 долларов США в месяц для потоковых вставок BQ (минимальный размер строки в 1 КБ убивает нас, поскольку большинство наших строк являются фактически вдвое меньше).   -  person Jon Chase    schedule 18.06.2015
comment
А как насчет идеи иметь 2 конвейера? Один для записи в GCS (потоковая передача), а другой (в пакетном режиме) для сбора этих данных и пакетной загрузки в BQ? Может ли это сработать для вас? Может быть, кто-нибудь из инженеров Google подскочит сюда и даст еще несколько (возможно, лучше :)) предложений.   -  person Graham Polley    schedule 18.06.2015
comment
Спасибо за предложение - потоковая передача в GCS и пакетная загрузка в BQ - это то, о чем я тоже думаю прямо сейчас, хотя мне не нравятся дополнительные операционные издержки. Думаю, я немного углублюсь в код, чтобы посмотреть, смогу ли я каким-то образом заставить DF выполнять пакетную загрузку в BQ даже в потоковом режиме.   -  person Jon Chase    schedule 20.06.2015
comment
Мне довелось увидеть эту ветку сегодня - это похоже на отличный запрос функции, который я сделаю внутри компании. Существует очевидное противоречие между задержкой и ее стоимостью - можете ли вы дать нам некоторое представление о том, какой тип задержки был бы приемлемым? Например, потоковая передача в GCS, а затем запуск задания пакетного импорта каждые 24 часа сделает это ...   -  person Dan Halperin    schedule 29.10.2015


Ответы (2)


Я не уверен, какую версию Apache Beam вы используете, но теперь можно использовать тактику микропакетов с помощью Stream Pipeline. Если вы решите так или иначе, вы можете использовать что-то вроде этого:

.apply("Saving in batches", BigQueryIO.writeTableRows()
                    .to(destinationTable(options))
                    .withMethod(Method.FILE_LOADS)
                    .withJsonSchema(myTableSchema)
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withTriggeringFrequency(Duration.standardMinutes(2))
                    .withNumFileShards(1);
                    .optimizedWrites());

О чем следует помнить

  1. Есть 2 разных метода: FILE_LOADS и STREAMING_INSERT, если вы используете первый, вам нужно включить withTriggeringFrequency и withNumFileShards. Для первого, по моему опыту, лучше использовать минуты, и количество будет зависеть от объема данных о пропускной способности. Если вы получаете довольно много, старайтесь, чтобы оно было маленьким, я видел «застрявшие ошибки», когда вы его слишком сильно увеличивали. Осколки могут в основном повлиять на ваш биллинг GCS, если вы добавите много осколков, это создаст больше файлов на таблицу за x количество минут.
  2. Если размер ваших входных данных не такой большой, потоковая вставка может работать очень хорошо, и стоимость не должна быть большой. В этом сценарии вы можете использовать метод STREAMING_INSERT и удалить withTriggeringFrequency и withNumFileShards. Кроме того, вы можете добавить withFailedInsertRetryPolicy, например InsertRetryPolicy.retryTransientErrors(), чтобы строки не терялись (имейте в виду, что идемпотентность не гарантируется с STREAM_INSERTS, поэтому возможно дублирование)
  3. Вы можете проверить свои вакансии в BigQuery и убедиться, что все работает! Помните о политиках для заданий с BigQuery (я думаю, это 1000 заданий на таблицу), когда вы пытаетесь определить частоту срабатывания и сегменты.

Примечание. Вы всегда можете прочитать эту статью об эффективных конвейерах агрегирования https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-каквреальномвремени,такивсовокупныхданныхспотокомданных

person Juan Urrego    schedule 04.07.2019

BigQueryIO.write() всегда использует задания загрузки BigQuery, когда вход PCollection ограничен. Если вы хотите, чтобы они также использовались без ограничений, укажите .withMethod(FILE_LOADS).withTriggeringFrequency(...).

person jkff    schedule 11.04.2018