Записывать результаты BigQuery в GCS в формате CSV с помощью Apache Beam

Я довольно новичок в работе над Apache Beam, где я пытаюсь написать конвейер для извлечения данных из Google BigQuery и записи данных в GCS в формате CSV с использованием Python.

Используя beam.io.read(beam.io.BigQuerySource()), я могу читать данные из BigQuery, но не знаю, как записать их в GCS в формате CSV.

Есть ли специальная функция для достижения того же, не могли бы вы мне помочь?

import logging

import apache_beam as beam


PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
]

with beam.Pipeline(argv=argv) as p:

    # Execute the SQL in big query and store the result data set into given Destination big query table.
    BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
    # Extract data from Bigquery to GCS in CSV format.
    # This is where I need your help

    BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
            table='tablename',
            dataset='datasetname',
            project='project_id',
            schema='name:string,gender:string,count:integer',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()

person Hari    schedule 22.10.2018    source источник
comment
Добро пожаловать в Stack Overflow! Пожалуйста, пройдите тур и посетите справочный центр, чтобы получить максимум от этого сайта. Пожалуйста, также поделитесь соответствующими частями кода, которые вы разработали на данный момент. Это помогает выяснить, в чем может быть проблема.   -  person Adrian W    schedule 22.10.2018


Ответы (2)


Вы можете сделать это с помощью WriteToText, чтобы добавить .csv суффикс и headers. Учтите, что вам нужно будет преобразовать результаты запроса в формат CSV. В качестве примера я использовал общедоступный набор данных и следующий запрос:

ВЫБЕРИТЕ word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` ГДЕ CHAR_LENGTH (word)> 3 ORDER BY word_count DESC LIMIT 10

Теперь мы читаем результаты запроса с помощью:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA теперь содержит пары ключ-значение:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

Мы можем применить функцию beam.Map, чтобы получить только значения:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

Отрывок из BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

И, наконец, сопоставьте снова, чтобы все значения столбцов были разделены запятыми вместо списка (примите во внимание, что вам нужно будет избегать двойных кавычек, если они могут появиться в поле):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Теперь записываем результаты в GCS с суффиксом и заголовками:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

Письменные результаты:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
person Guillem Xercavins    schedule 22.10.2018
comment
В том же примере, если бы мне пришлось записывать файлы из GCS в локальный каталог с помощью подпроцесса (команда GSUTIL), как бы я добился успеха в конвейере. - person Hari; 22.10.2018

Для тех, кто ищет обновления с использованием Python 3, замените строку

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

с участием

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))
person Kim Merino    schedule 03.03.2021