Задайте вопрос, как определить настраиваемую (временную) функцию, которая работает параллельно и возвращает фрейм данных с другой формой

Я пытаюсь реализовать функцию временного сворачивания для «сопоставления» с различными разделами фрейма данных dask, который, в свою очередь, изменяет форму рассматриваемого фрейма данных (или, альтернативно, создает новый фрейм данных с измененной формой). Вот как далеко я зашел. Результат res, возвращаемый при вычислении, представляет собой список из 3 отложенных объектов. Когда я пытаюсь вычислить каждый из них в цикле (последние две строки кода), это приводит к появлению «TypeError: объект 'DataFrame' не вызывается». После прохождения примеры для map_partitions, я также попытался изменить входной DF ( inplace) в функции без возвращаемого значения, что вызывает аналогичную ошибку TypeError с NoneType. Что мне не хватает?

Кроме того, глядя на визуализацию (прилагается), я чувствую, что существует потребность в сведении индивидуально вычисляемых (свернутых) разделов в один DF. Как мне это сделать?

#! /usr/bin/env python

# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &

from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import math

foldbucketsecs=30
periodicitysecs=15
secsinday=24 * 60 * 60
chunksizesecs=60 # 1 minute
numts = 5
start = 1525132800 # 01/05
end = 1525132800 + (3 * 60) # 3 minute

c = Client('127.0.0.1:8786')

def fold(df, start, bucket):
    return df

def reduce_folds(df):
    return df

def load(epoch):
    idx = []
    for ts in range(0, chunksizesecs, periodicitysecs):
        idx.append(epoch + ts)
    d = np.random.rand(chunksizesecs/periodicitysecs, numts)
    ts = []
    for i in range(0, numts):
        tsname = "ts_%s" % (i)
        ts.append(tsname)
        gts.append(tsname)
    res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
    res.index = pd.to_datetime(arg=res.index, unit='s')
    return res

gts = []
load(start)
cols = len(gts)

idx1 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+periodicitysecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx1[:0], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(start, end, chunksizesecs)]

from_delayed = dd.from_delayed(dfs, meta, 'sorted')

nfolds = int(math.ceil((end - start)/foldbucketsecs))
cprime = nfolds * cols

gtsnew = []

for i in range(0, cprime):
    gtsnew.append("ts_%s,fold=%s" % (i%cols, i/cols))

idx2 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+foldbucketsecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx2[:0], data=[], columns=gtsnew, dtype=np.float64)
folded_df = from_delayed.map_partitions(delayed(fold)(from_delayed, start, foldbucketsecs), meta=meta)
result = c.submit(reduce_folds, folded_df)

c.gather(result).visualize(filename='/usr/share/nginx/html/svg/df4.svg')

res = c.gather(result).compute()

for f in res:
    f.compute()

person PhaKuDi    schedule 28.06.2018    source источник
comment
Можете ли вы создать mcve? Думаю, было бы легче получить помощь.   -  person rpanai    schedule 28.06.2018
comment
Привет @ user32185, я привел в порядок свой основной пост и сохранил реализацию функции складывания. Однако даже при чрезмерном упрощении (вычеркните все, кроме return df в конце) функция сворачивания также не работает. Выдает ту же ошибку TypeError.   -  person PhaKuDi    schedule 29.06.2018
comment
Пожалуйста, выкопайте все, чтобы у нас осталось только самое необходимое, чтобы добраться до вашего исключения   -  person mdurant    schedule 29.06.2018
comment
@mdurant. Можете ли вы проверить сейчас?   -  person PhaKuDi    schedule 02.07.2018


Ответы (1)


Неважно! Это была моя вина, вместо того, чтобы обернуть мою функцию отложенной, я просто передал ее в вызов map_partitions, и это сработало.

folded_df = from_delayed.map_partitions(fold, start, foldbucketsecs, nfolds, meta=meta)

person PhaKuDi    schedule 05.07.2018