Конвейеры потоковой передачи Python экспериментально доступны, начиная с Beam 2.5.0, как описано в здесь а>
Поэтому вам нужно будет установить apache-beam 2.5.0 и apache-beam [gcp]
pip install apache-beam==2.5.0
pip install apache-beam[gcp]
Я выполнил эту команду:
python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming
Используя приведенный ниже код, он работает нормально:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', dest='input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub
lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
#Adapt messages from PubSub to BQ table
lines = lines | beam.Map(parse_pubsub)
lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
#Write to a BQ table
lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
В этом коде используется общедоступная тема" --topic projects / pubsub-public-data / themes / taxirides-realtime "и таблица BQ, которую я создал с правильной схемой.
Если вы используете этот пример, будьте осторожны, не оставляйте его работающим, иначе вы понесете расходы, так как будете получать много сообщений из этой темы PubSub.
person
VictorGGl
schedule
02.08.2018