Dask Distributed: Распараллеливание чтения и анализа большого количества отдельных файлов.

Вопрос

Как мне использовать Dask Distributed для распараллеливания чтения каталога файлов в отдельные DataFrames, которые я затем обрабатываю с помощью настраиваемой функции? Предположим, что n-файлов - это что-то вроде 100000

Фон

Я новичок в Dask и не совсем понимаю, как это задать (какие термины использовать и т. Д.), Поэтому вот картина того, что я пытаюсь достичь:

Обзор

У меня есть много небольших отдельных файлов .txt "бухгалтерской книги" (например, файлов с разделителями-строками с меткой времени и значениями атрибутов на момент метки времени), хранящихся в HDFS.

Параллельно хочу ...

  1. Прочтите каждый файл в DataFrame (примечание: я не пытаюсь объединить все файлы в один большой df!);

  2. К каждому фрейму данных примените пользовательскую функцию (см. Ниже); а потом

  3. Объедините каждый результат (возврат из пользовательской функции) в окончательный объект и сохраните его обратно в HDFS.

Кажется, что почти каждый ответ, который я нахожу (при поиске в Google связанных терминов), касается загрузки нескольких файлов в один фрейм данных.

То, что я обрабатываю, функцию, которую я использую

Каждый файл бухгалтерской книги / DataFrame:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

Функция анализа:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

def analyze(df):

  columns_with_age = ("location", "status")
  columns_without_age = ("wh_id")

  # Get the most-recent values (from the last row of the df)
  row_count = df.count()
  last_row = df.collect()[row_count-1]

  # Create an empty "final row" dictionary
  final_row = {}

  # For each column for which we want to calculate an age value ...
  for c in columns_with_age:

      # Initialize loop values
      target_value = last_row.__getitem__(c)
      final_row[c] = target_value
      timestamp_at_lookback = last_row.__getitem__("timestamp")
      look_back = 1
      different = False

      while not different:
          previous_row = df.collect()[row_count - 1 - look_back]
          if previous_row.__getitem__(c) == target_value:
              timestamp_at_lookback = previous_row.__getitem__("timestamp")
              look_back += 1

          else:
              different = True

      # At this point, a difference has been found, so calculate the age
      final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

Таким образом, данные бухгалтерской книги / DataFrame уменьшатся до (при условии, что расчет был проведен 14 апреля 2019 г.):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

person Dan    schedule 15.04.2019    source источник
comment
Вы можете объединить чтение и оценку уровня файла в функцию, а затем использовать: stackoverflow.com/questions/9786102/ после обработки всех файлов вы можете объединить результаты в окончательный результат   -  person Lucas    schedule 16.04.2019
comment
Вы можете прочитать о delayed. Ознакомьтесь с этим asnwer.   -  person rpanai    schedule 16.04.2019
comment
Похоже, я мог бы захотеть сделать гибрид из двух вышеупомянутых комментариев, то есть объединить чтение / оценку файла в одну функцию, а затем использовать Dask Distributed client.map(eval_func, [list of HDFS filepaths]).   -  person Dan    schedule 17.04.2019


Ответы (1)


Параллельная запись из многих процессов в один выходной файл на самом деле невозможна, потому что вы заранее не знаете, как долго будет длиться каждый из результатов, поэтому вы не знаете, где в файле разместить другие результаты. кроме того, HDFS действительно любит получать большие блоки непрерывных данных (возможно, 64 МБ), чем инкрементные обновления.

Вы можете сделать следующее:

  • запишите все свои выходные данные в отдельные файлы, а затем запустите отдельное задание для их объединения; это совершенно нормально, если обработка фреймов данных велика по сравнению со временем чтения / записи
  • используйте распределенный client.submit API и as_completed для записи результатов в выходной файл из вашего основного процесса. Обратите внимание, что вы можете сделать это в соответствии с исходным порядком, если это важно, но это потребует дополнительной работы.
person mdurant    schedule 16.04.2019