Вопрос
Как мне использовать Dask Distributed для распараллеливания чтения каталога файлов в отдельные DataFrames, которые я затем обрабатываю с помощью настраиваемой функции? Предположим, что n-файлов - это что-то вроде 100000
Фон
Я новичок в Dask и не совсем понимаю, как это задать (какие термины использовать и т. Д.), Поэтому вот картина того, что я пытаюсь достичь:
У меня есть много небольших отдельных файлов .txt "бухгалтерской книги" (например, файлов с разделителями-строками с меткой времени и значениями атрибутов на момент метки времени), хранящихся в HDFS.
Параллельно хочу ...
Прочтите каждый файл в DataFrame (примечание: я не пытаюсь объединить все файлы в один большой df!);
К каждому фрейму данных примените пользовательскую функцию (см. Ниже); а потом
Объедините каждый результат (возврат из пользовательской функции) в окончательный объект и сохраните его обратно в HDFS.
Кажется, что почти каждый ответ, который я нахожу (при поиске в Google связанных терминов), касается загрузки нескольких файлов в один фрейм данных.
То, что я обрабатываю, функцию, которую я использую
Каждый файл бухгалтерской книги / DataFrame:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
Функция анализа:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
def analyze(df):
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
Таким образом, данные бухгалтерской книги / DataFrame уменьшатся до (при условии, что расчет был проведен 14 апреля 2019 г.):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }
delayed
. Ознакомьтесь с этим asnwer. - person rpanai   schedule 16.04.2019client.map(eval_func, [list of HDFS filepaths])
. - person Dan   schedule 17.04.2019