Есть ли способ прочитать многострочный CSV-файл в Apache Beam с помощью преобразования ReadFromText (Python)?

Есть ли способ прочитать многострочный CSV-файл с помощью преобразования ReadFromText в Python? У меня есть файл, содержащий одну строку. Я пытаюсь заставить Apache Beam читать ввод как одну строку, но не могу заставить ее работать.

def print_each_line(line):
    print line

path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz

p = apache_beam.Pipeline()

(p
 | 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
 | 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
 )

# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz

Приведенный выше код анализирует ввод как две строки, хотя стандарт для многострочных файлов csv заключается в заключении многострочных элементов в двойные кавычки.


comment
Вам нужна коллекция PCollection, содержащая только одну строку. Я прав?   -  person Arjun Kay    schedule 20.04.2018
comment
@ArjunKay Да, в настоящее время у меня вводится одна строка, но луч рассматривает ее как две   -  person Brandon    schedule 20.04.2018
comment
Знаете ли вы, ребята, знаете, улучшена ли поддержка многострочного CSV в новых версиях? учитывая, что об этом давно спрашивали? Я не мог найти много подходящего материала.   -  person tony _008    schedule 09.07.2021


Ответы (3)


Beam не поддерживает синтаксический анализ файлов CSV. Однако вы можете использовать Python csv.reader. Вот пример:

import apache_beam
import csv

def print_each_line(line):
  print line

p = apache_beam.Pipeline()

(p 
 | apache_beam.Create(["test.csv"])
 | apache_beam.FlatMap(lambda filename:
     csv.reader(apache_beam.io.filesystems.FileSystems.open(filename)))
 | apache_beam.FlatMap(print_each_line))

p.run()

Выход:

['foo', 'bar', 'blah blah\nmore blah blah', 'baz']
person Udi Meiri    schedule 20.04.2018

Ни один из ответов не сработал для меня, но это сработало

(
  p
  | beam.Create(['data/test.csv'])
  | beam.FlatMap(lambda filename:
    csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(known_args.input)))
  | "Take only name" >> beam.Map(lambda x: x[0])
  | WriteToText(known_args.output)
)
person Juan Acevedo    schedule 08.11.2020

ReadFromText анализирует текстовый файл как элементы, разделенные новой строкой. Итак, ReadFromText рассматривает две строки как два элемента. Если вы хотите, чтобы содержимое файла было единым элементом, вы можете сделать следующее:

contents = []
contents.append(open(path).read()) 
p = apache_beam.Pipeline()
p | beam.Create(contents)
person Arjun Kay    schedule 22.04.2018