Сообщения об ошибках зависимости при запуске конвейера luigi

Я пытаюсь создать конвейер, который начинается с класса, который разделяет один файл на несколько CSV в зависимости от состояния, в котором находится пользователь, затем просматривает созданные файлы, представляющие разные состояния, и пытается определить, перешел ли пользователь из одного состояния в другое. другой, возвращающий 1, если пользователь сделал это, и 0, если он / она не сделал, соответствует этим «вероятностям» с использованием гауссовского kde, сохраняет это как рассол, а затем получает образцы из рассола и сохраняет их как csvs.

Я использую luigi для создания этого конвейера, но все время получаю сообщения об ошибках при попытке запустить свой код. Кажется, что конвейер не работает при запуске класса state_to_state.

Вот код, который я написал:

отдельный_csv.py:

import luigi
import pandas as pd
import numpy as np
import os
import state_to_state_transitions2 as sst
class data_filter(luigi.Task):
    file = pd.read_csv('/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv')
    actions = file.state.unique()
    def run(self):
        for current in self.actions:
            filter_file = self.file.loc[self.file.state.str.contains(current,na=False)]
            filter_file.to_csv('/Users/emmanuels/Documents/AttributionData/Data/'+str(current)+'.csv')
    def requires(self):
        return []
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/'+str(self.actions)+'.csv')

state_to_state_transitions2;

import luigi
import pandas as pd
import separate_csv
class state_to_state(luigi.Task):
    first_file = luigi.Parameter()
    second_file = luigi.Parameter()
    def run(self):
        #iterate through states and find probability of anonymous id existing in next state
        first = pd.read_csv(self.first_file)
        second = pd.read_csv(self.second_file)
        first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
        #save anonymous id along with probability (1,0) of whether or not it exists in the next state
        first[['anonymous_id','probability']].to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[7][:-4]+'.csv'))
    def requires(self):
        return separate_csv.data_filter()
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.first_file.split('/')[6][:-4]+'to'+self.second_file.split('/')[6][:-4]+'.csv'))

gaussian_kdefit;

import pandas as pd
import pickle
from scipy import stats
import luigi
import state_to_state_transitions2 as sst
class save_distributions(luigi.Task):
    file_tag = luigi.Parameter()
    path = '/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'
    def run(self):
        data = pd.read_csv(path+self.file_tag)
        kernel = stats.gaussian_kde(data['probability'])
        #we fit the distribution and save as a pickle
        pickle.dump(kernel,open('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck','wb'))
    def requires(self):
        files = ['Session.csv','lead.csv','opportunity.csv','complete.csv']
        task_list = []
        for i in range(1,len(files)):
            one = self.path+str(files[i-1])
            two = self.path+str(files[i])
            task_list.append(sst.state_to_state(first_file=one,second_file=two))
        return task_list
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')

get_samples:

import pandas as pd
import luigi
import gaussian_kdefit as gkde
#takes n samples and saves sample in csv
class sample_output(luigi.Task):
    file_tag = luigi.Parameter()
    size = luigi.Parameter()
    def run(self):
        kernel = pd.read_pickle('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'probabs'+'.pck')
        kernel = kernel.resample(int(self.size))
        pd.DataFrame(kernel).transpose().to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')
    def requires(self):
        files = ['Sessiontolead.csv', 'leadtoopportunity.csv', 'opportunitytocomplete.csv']
        return [gkde.save_distributions(file_tag=file) for file in files]
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+'+sampleprobabs'+'.csv')

и мой класс-оболочка:

import get_samples as getsamps
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
    def requires(self):
        file_tag = ['Sessiontolead', 'leadtoopportunity', 'opportunitytocomplete']
        task_list = []
        size = 10
        for i in range(0,len(file_tag)):
            for k in range(1,size):
                task_list.append(getsamps.sample_output(file_tag=file_tag[i],size=size))
        return task_list
    def run(self):
        print('Wrapper ran')
        pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

Вот образец окончательного чистого файла атрибуции:

{'Unnamed: 0': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
 'uniques': {0: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
  1: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/businessDetails/',
  2: '2019-06-18 09:00:000000a6a0-00bc-475f-a9e5-9dcbb4309e78https://signup.yoc.com/signup/v1/https://signup.yoc.com/signup/v1/step/userDetails/',
  3: '2018-05-17 20:00:000000c924-5959-4e2d-8757-0d10f96ca462http://m.facebook.com/https://www.yoc.com/signup/',
  4: '2019-02-24 16:00:000002269a-1e39-4cdf-a43e-cecf0a277c1chttps://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'anonymous_id': {0: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  1: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  2: '0000a6a0-00bc-475f-a9e5-9dcbb4309e78',
  3: '0000c924-5959-4e2d-8757-0d10f96ca462',
  4: '0002269a-1e39-4cdf-a43e-cecf0a277c1c'},
 'user_id': {0: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  1: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  2: '1560849071242-a19cdf50-ceec-41a0-ab51-ba9a45c8cda9',
  3: nan,
  4: nan},
 'ts': {0: '2019-06-18 09:11:14.409000',
  1: '2019-06-18 09:11:15.028000',
  2: '2019-06-18 09:12:03.118000',
  3: '2018-05-17 20:31:32.203000',
  4: '2019-02-24 16:08:32.661000'},
 'url': {0: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
  1: 'https://signup.yoc.com/signup/v1/step/businessDetails/',
  2: 'https://signup.yoc.com/signup/v1/step/userDetails/',
  3: 'https://www.yoc.com/signup/',
  4: 'https://signup.yoc.com/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'path': {0: '/za/signup/v1/step/businessDetails/',
  1: '/za/signup/v1/step/businessDetails/',
  2: '/za/signup/v1/step/userDetails/',
  3: '/za/signup/',
  4: '/continue/1551024250465-dfd0e1d5-b76a-4bfa-bc29-9fcf5ef6b91c'},
 'referrer_domain': {0: 'signup.yoc.com',
  1: 'signup.yoc.com',
  2: 'signup.yoc.com',
  3: 'm.facebook.com',
  4: nan},
 'utm_campaign': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_content': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_medium': {0: nan, 1: nan, 2: nan, 3: nan, 4: nan},
 'utm_source': {0: nan, 1: nan, 2: nan, 3: 'facebook', 4: nan},
 'user_agent': {0: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  1: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  2: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',
  3: 'Mozilla/5.0 (Linux; Android 8.0.0; SM-G965F Build/R16NW; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/66.0.3359.158 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/172.0.0.66.93;]',
  4: 'Opera/9.80 (Android; Opera Mini/38.1.2254/131.123; U; en) Presto/2.12.423 Version/12.16'},
 'rank': {0: 1, 1: 2, 2: 3, 3: 1, 4: 1},
 'state': {0: 'lead',
  1: 'lead',
  2: 'opportunity',
  3: 'Session',
  4: 'opportunity'}}

Вот результат трассировки, который я получаю:

Scheduled 9 tasks of which:
* 1 ran successfully:
    - 1 data_filter(file=/Users/emmanuels/Desktop/Attribution/finalcleanattributiondata.csv)
* 1 failed:
    - 1 state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
* 7 were left pending, among these:
    * 7 had failed dependencies:
        - 3 sample_output(file_tag=Sessiontolead, size=10) ...
        - 3 save_distributions(file_tag=Sessiontolead.csv,leadtoopportunity.csv,opportunitytocomplete.csv)
        - 1 wrapper()

....

INFO: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) running   state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
WARNING: Using wildcards in path /Users/emmanuels/Documents/AttributionData/Data/['lead' 'opportunity' 'Session' 'complete'].csv might lead to processing of an incomplete dataset; override exists() to suppress the warning.
ERROR: [pid 45306] Worker Worker(salt=271561701, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=45306) failed    state_to_state(first_file=/Users/emmanuels/Documents/AttributionData/Data/Session.csv, second_file=/Users/emmanuels/Documents/AttributionData/Data/lead.csv)
Traceback (most recent call last):
  File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 175, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: data_filter__Users_emmanuels_c87d333278
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   state_to_state__Users_emmanuels__Users_emmanuels_c95a12621e   has status   FAILED

person Emm    schedule 29.09.2019    source источник
comment
Можете ли вы предоставить фактические ошибки, которые вы получаете? Или их лучшее описание?   -  person iHowell    schedule 30.09.2019
comment
@iHowell Я попытался добавить более подробную информацию о получаемом сообщении об ошибке   -  person Emm    schedule 30.09.2019


Ответы (1)


Изучив ваши обновления, я заметил, что ваша первая задача на самом деле не имеет параметров. У вас есть всего пара предметов. Вы не должны запускать pd.read_csv внутри объявления переменной. Вместо этого вы должны иметь его в методе run (если вам не нужно требовать что-то, основанное на данных, тогда также прочитайте его в методе requires). Вместо этого измените file на luigi.Parameter со значением default. Кроме того, трудно сказать, что такое self.actions. Параметры (и собственные переменные) должны быть примитивами или сериализуемыми в примитивы.

Кроме того, у вас есть рекурсивный импорт, который может все испортить. В вашем separate_csv вы импортируете state_to_state_transition2 и наоборот.

Честно говоря, здесь столько всего, может быть много всего. Я бы поработал над сокращением вашего рабочего процесса и работал над компонентами по одному. Кроме того, вы можете использовать методы ввода / вывода luigi для лучшей передачи данных по конвейеру.

Предыдущая проблема, которая все еще могла создавать проблемы:

Ваша проблема может заключаться в том, что вы не используете внутреннюю атомарную файловую систему Луиджи. Вместо открытия файла в state_to_state_transision2:

class state_to_state(luigi.Task):
    def run(self):
        ...
        first[['anonymous_id','probability']].to_csv(...)

Вместо того, чтобы писать в него самому, вы должны открыть файл с помощью команды вывода luigi:

class state_to_state(luigi.Task):
    def run(self):
        ...
        with self.output().open('w') as out_csv:
            out_csv.write(first[['anonymous_id','probability']].to_csv())

Если не использовать атомарную файловую систему, файл будет создан, даже если вы просто напишете в него ошибку. Это сигнализирует luigi, что задача завершена из-за существования файла.

person iHowell    schedule 30.09.2019