Принудительная локализация на подмножествах фреймов данных Dask

Я пытаюсь распределить большой фрейм данных Dask на нескольких машинах для (позже) распределенных вычислений в фрейме данных. Для этого я использую dask-distribution.

Все примеры / документы с распределением dask, которые я вижу, заполняют начальную загрузку данных из сетевого ресурса (hdfs, s3 и т. необходимое зло и просто съедает начальную стоимость.) Это подчеркивается в ответе на другой вопрос: Обменивается ли Dask данными с HDFS для оптимизации локальности данных?

Однако я вижу случаи, когда мы этого хотели бы. Например, если у нас есть сегментированная база данных + рабочие процессы, совместно расположенные на узлах этой БД, мы бы хотели принудительно загружать записи только из локального сегмента в локальные рабочие процессы. Из документации / примеров пересечение сети кажется неизбежно предполагаемой стоимостью. Можно ли принудительно получить части одного фрейма данных от определенных исполнителей?

Альтернатива, которую я пробовал, - это попытаться заставить каждого рабочего запускать функцию (итеративно отправляемую каждому рабочему), где функция загружает только данные, локальные для этой машины / осколка. Это работает, и у меня есть несколько оптимально локальных фреймов данных с одинаковой схемой столбцов, однако теперь у меня нет ни одного фрейма данных, а есть n фреймов данных.


person CoderOfTheNight    schedule 16.05.2019    source источник


Ответы (3)


Вы можете создавать «коллекции» dask, такие как dataframe, из фьючерсов и отложенных объектов, которые хорошо взаимодействуют друг с другом.

Для каждого раздела, если вы знаете, какая машина должна его загрузить, вы можете создать будущее следующим образом:

f = c.submit(make_part_function, args, workers={'my.worker.ip'})

где c - это клиент dask, а адрес - это машина, на которой вы хотите, чтобы это произошло. Вы также можете указать allow_other_workers=True, если это предпочтение, а не требование.

Чтобы создать фрейм данных из списка таких фьючерсов, вы могли бы сделать

df = dd.from_delayed([dask.delayed(f) for f in futures])

и в идеале предоставить meta=, дающее описание ожидаемого фрейма данных. Теперь дальнейшие операции с данным разделом предпочтительнее планировать на том же работнике, который уже хранит данные.

person mdurant    schedule 16.05.2019

Меня также интересует возможность ограничить вычисления определенным узлом (и данными, локализованными на этом узле). Я попытался реализовать описанное выше с помощью простого скрипта (см. Ниже), но, глядя на полученный фрейм данных, выдает ошибку (из dask / dataframe / utils.py :: check_meta ()):

ValueError: Metadata mismatch found in `from_delayed`.

Expected partition of type `DataFrame` but got `DataFrame`

Пример:

from dask.distributed import Client
import dask.dataframe as dd
import dask

client = Client(address='<scheduler_ip>:8786')
client.restart()

filename_1 = 'http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv'
filename_2 = 'http://samplecsvs.s3.amazonaws.com/SalesJan2009.csv'

future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
future_2 = client.submit(dd.read_csv, filename_2, workers='w2')

client.has_what()
# Returns: {'tcp://<w1_ip>:41942': ('read_csv-c08b231bb22718946756cf46b2e0f5a1',),
#           'tcp://<w2_ip>:41942': ('read_csv-e27881faa0f641e3550a8d28f8d0e11d',)}

df = dd.from_delayed([dask.delayed(f) for f in [future_1, future_2]])

type(df)
# Returns: dask.dataframe.core.DataFrame

df.head()
# Returns:
#      ValueError: Metadata mismatch found in `from_delayed`.
#      Expected partition of type `DataFrame` but got `DataFrame`

Примечание. В среде dask есть два рабочих узла (с псевдонимом w1 и w2), узел планировщика, и сценарий выполняется на внешнем хосте. dask == 1.2.2, распределенный == 1.28.1

person jpjenk    schedule 17.05.2019

Странно вызывать несколько функций dask dataframe параллельно. Возможно, вы хотели вместо этого вызвать несколько вызовов Pandas read_csv параллельно?

# future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
# future_2 = client.submit(dd.read_csv, filename_2, workers='w2')
future_1 = client.submit(pandas.read_csv, filename_1, workers='w1')
future_2 = client.submit(pandas.read_csv, filename_2, workers='w2')

См. https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections для получения дополнительной информации.

person MRocklin    schedule 18.05.2019