Как импортировать несколько файлов CSV за одну загрузку?

Предположим, у меня есть определенная схема для загрузки 10 файлов CSV в папку. Есть ли способ автоматически загружать таблицы с помощью Spark SQL. Я знаю, что это можно сделать, используя отдельный фрейм данных для каждого файла [приведенный ниже], но можно ли это автоматизировать с помощью одной команды, а не указывать файл, могу ли я указать папку?

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/2008.csv")

person Chendur    schedule 05.06.2016    source источник


Ответы (6)


Используйте подстановочный знак, например замените 2008 на *:

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/*.csv") // <-- note the star (*)

Искра 2.0

// these lines are equivalent in Spark 2.0
spark.read.format("csv").option("header", "true").load("../Downloads/*.csv")
spark.read.option("header", "true").csv("../Downloads/*.csv")

Примечания:

  1. Вместо format("com.databricks.spark.csv") используйте метод format("csv") или csv. com.databricks.spark.csv формат был интегрирован в 2.0.

  2. Используйте spark, а не sqlContext

person Yaron    schedule 05.06.2016
comment
Первое решение, похоже, загружает только первый csv в папке. Вы знаете, как их все загрузить? - person mdornfe1; 21.11.2016
comment
@ mdornfe1 - должно работать. нам нужно понять детали (версия Spark, точная используемая команда, файлы csv и т. д.) в случае, когда это не сработало для вас. если он короткий, добавьте в комментарий подробности, в противном случае откройте новый вопрос. - person Yaron; 21.11.2016
comment
Есть ли способ загружать файлы из разной глубины файловой структуры? Скажите, .../Downloads/first.csv, а также .../Downloads/subfolder/second.csv? Использование .csv(".../Downloads/*/*.csv) этого не делает. - person NotYanka; 27.03.2017
comment
@NotYanka, возможно, вы захотите проверить другой ответ - load примет список путей в виде строк, и каждый из них может содержать подстановочный знак. - person mattsilver; 22.04.2017
comment
@ohruunuruus Я попробовал другой ответ, но он не принимает список путей. любые другие предложения? - person Abu Shoeb; 31.10.2017
comment
@Yaron, а что, если имена столбцов в csvs не совпадают? Как будет выводиться схема? Есть ли способ принудительно объединить имена столбцов, которые будут использоваться в схеме? Я не использую inferschema, она просто читается как строковый ... Я - person leonard; 03.04.2018
comment
отлично работает с sqlContext.read.load v2.3 (pyspark), в чем причина использования spark вместо sqlContext - person muon; 12.04.2018
comment
При работе с файлами, которые будут иметь одинаковые столбцы, но в другом порядке, знает ли Spark, как сопоставить данные из разных файлов с правильными столбцами? - person Frank Pinto; 20.01.2020

Ex1:

Чтение одного CSV-файла. Укажите полный путь к файлу:

 val df = spark.read.option("header", "true").csv("C:spark\\sample_data\\tmp\\cars1.csv")

Ex2:

Чтение нескольких файлов CSV с передачей имен:

val df=spark.read.option("header","true").csv("C:spark\\sample_data\\tmp\\cars1.csv", "C:spark\\sample_data\\tmp\\cars2.csv")

Ex3:

Чтение нескольких файлов CSV с передачей списка имен:

val paths = List("C:spark\\sample_data\\tmp\\cars1.csv", "C:spark\\sample_data\\tmp\\cars2.csv")
val df = spark.read.option("header", "true").csv(paths: _*)

Ex4:

Чтение нескольких файлов CSV в папке без учета других файлов:

val df = spark.read.option("header", "true").csv("C:spark\\sample_data\\tmp\\*.csv")

Ex5:

Чтение нескольких файлов CSV из нескольких папок:

val folders = List("C:spark\\sample_data\\tmp", "C:spark\\sample_data\\tmp1")
val df = spark.read.option("header", "true").csv(folders: _*)
person mputha    schedule 12.10.2018

Обратите внимание, что вы можете использовать и другие приемы, например:

-- One or more wildcard:
       .../Downloads20*/*.csv
--  braces and brackets   
       .../Downloads201[1-5]/book.csv
       .../Downloads201{11,15,19,99}/book.csv
person Jamal Jam    schedule 31.10.2017
comment
Отлично! Хотелось бы, чтобы это было задокументировано. Но не смог найти его на spark.apache.org/docs/latest/ api / python / index.html. - person flow2k; 03.07.2019

Reader's Digest: (Spark 2.x)

Например, если у вас есть 3 каталога с файлами csv:

dir1, dir2, dir3

Затем вы определяете пути как строку списка путей, разделенных запятыми, следующим образом:

пути = "dir1 / , dir2 /, dir3 / *"

Затем используйте следующую функцию и передайте ей эту переменную paths

def get_df_from_csv_paths(paths):

        df = spark.read.format("csv").option("header", "false").\
            schema(custom_schema).\
            option('delimiter', '\t').\
            option('mode', 'DROPMALFORMED').\
            load(paths.split(','))
        return df

К тому времени запустив:

df = get_df_from_csv_paths(paths)

Вы получите в df единый фрейм данных Spark, содержащий данные из всех CSV, найденных в этих трех каталогах.

=============================================== ============================

Полная версия:

Если вы хотите получить несколько CSV-файлов из нескольких каталогов, вам просто нужно передать список и использовать подстановочные знаки.

Например:

если ваш data_path выглядит так:

's3: // bucket_name / subbucket_name / 2016-09 - * / 184 / *,
s3: // bucket_name / subbucket_name / 2016-10 - * / 184 / *,
s3: // bucket_name / subbucket_name / 2016-11 - * / 184 / *,
s3: // bucket_name / subbucket_name / 2016-12 - * / 184 / *, ... '

вы можете использовать указанную выше функцию для одновременного приема всех CSV-файлов во всех этих каталогах и подкаталогах:

Это приведет к загрузке всех каталогов в s3 bucket_name / subbucket_name / в соответствии с указанными шаблонами подстановочных знаков. например первый образец заглянет в

bucket_name / subbucket_name /

для всех каталогов с именами, начинающимися с

2016-09-

и для каждого из них возьмите только каталог с именем

184

и в этом подкаталоге найдите все файлы csv.

И это будет выполняться для каждого из шаблонов в списке, разделенном запятыми.

Это работает лучше, чем союз.

person eiTan LaVi    schedule 07.03.2017
comment
Это ответ, который я искал, но ответ намного сложнее, чем должен быть. Ему может быть полезен простой пример, в котором передается список путей. Создание списка из строки, содержащей пути, разделенные запятыми, удобно, но немного выходит за рамки вопроса. Из-за этого и других вещей (например, причудливой многофункциональной функции) я сначала пропустил этот ответ. - person mattsilver; 22.04.2017
comment
Это ответ, который я тоже искал. Это позаботится о получении данных из нескольких корзин / каталогов / форматов и т. Д. - person Vivek; 19.07.2017
comment
Согласитесь с @ohruunuruus - хороший ответ, но люди здесь, как правило, достаточно свободно говорят на Python, чтобы знать, как создавать списки. - person flow2k; 03.07.2019

Используя Spark 2.0+, мы можем загружать несколько файлов CSV из разных каталогов, используя df = spark.read.csv(['directory_1','directory_2','directory_3'.....], header=True). Дополнительную информацию см. В документации здесь

person Neeleshkumar S    schedule 28.07.2017

val df = spark.read.option("header", "true").csv("C:spark\\sample_data\\*.csv)

будут рассматривать файлы tmp, tmp1, tmp2, ....

person user14454778    schedule 15.10.2020
comment
Есть ли способ узнать количество файлов, которые были прочитаны с помощью этой команды? - person Debapratim Chakraborty; 15.03.2021