Потоковые конвейеры с приемниками BigQuery в Python

Я создаю конвейер потоковой передачи лучей apache, источником которого является Pubsub, а источником - BigQuery. Я получил сообщение об ошибке:

«Сбой рабочего процесса. Причины: неизвестный код сообщения».

Каким бы загадочным ни было это сообщение, теперь я считаю, что BigQuery не поддерживается в качестве приемника для потоковых конвейеров, здесь говорится следующее: Трансляция из Pub / Sub в BigQuery

Правильно ли я, что проблема именно в этом? А если нет, то все равно не поддерживается?

Может ли кто-нибудь намекнуть, когда эта функция будет выпущена? Обидно, я был очень взволнован, чтобы использовать это.


person goose    schedule 31.07.2018    source источник


Ответы (1)


Конвейеры потоковой передачи Python экспериментально доступны, начиная с Beam 2.5.0, как описано в здесь

Поэтому вам нужно будет установить apache-beam 2.5.0 и apache-beam [gcp]

pip install apache-beam==2.5.0
pip install apache-beam[gcp]

Я выполнил эту команду:

python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming

Используя приведенный ниже код, он работает нормально:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', dest='input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:

    # Read from PubSub
    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
    #Adapt messages from PubSub to BQ table
    lines = lines | beam.Map(parse_pubsub)
    lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
    #Write to a BQ table 
    lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

В этом коде используется общедоступная тема" --topic projects / pubsub-public-data / themes / taxirides-realtime "и таблица BQ, которую я создал с правильной схемой.

Если вы используете этот пример, будьте осторожны, не оставляйте его работающим, иначе вы понесете расходы, так как будете получать много сообщений из этой темы PubSub.

person VictorGGl    schedule 02.08.2018
comment
Спасибо, Виктор - единственная строка, которую мне пришлось изменить, это та, в которой включен WriteToBigQuery. Возможно, я просто допустил простую синтаксическую ошибку. - person goose; 05.08.2018