Передача параметров редюсеру в MRjob

Я использую MRjob для запуска заданий Hadoop Streaming через наш экземпляр HBase. На всю жизнь я не могу понять, как передать параметр моему редуктору. У меня есть два параметра, которые я хочу передать своему редюсеру при запуске задания: startDate и endDate. Вот как выглядит мой текущий редуктор:

def reducer(self, groupId, meterList):
    """
    Print bucket.
    """
    sys.stderr.write("Working on group = " + str(groupId) + "\n")
    #print "Opening connection..."
    conn = open_connection(hostname)
    #print "Getting table..."
    table = get_table(conn, tableName)

    compositeDf = DataFrame()

    for meterId in meterList:
        sys.stderr.write("Querying: " + str(meterId) + "\n")
        df = extract_meter_data(table, meterId, startDate, endDate)

Кажется, я не могу передать startDate и endDate в качестве параметров для моего редуктора. Единственный способ, которым я могу получить задание для получения параметров, — через глобальную переменную в верхней части класса.

startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)

class MRDataQuality(MRJob):
    """
    MapReduce job that does a data quality check on the meter data in HBase.
    """

Но это грязно. Я хочу передать это, позвонив на работу. Я пробовал много методов. Установка его как переменной экземпляра, установка его как статической переменной класса, создание перегруженного конструктора для MRDataQualityJob... Кажется, ничего не работает. Я программно вызываю его из своего скрипта верхнего уровня следующим образом:

if args.hadoop:
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
    mrdq_job = MRDataQuality(args=[meterFile])

with mrdq_job.make_runner() as runner:
    runner.run()

Независимо от того, что я делаю с экземпляром mrdq_job, кажется, что runner.run() использует новый новый экземпляр класса, в котором не определены экземпляр или статические переменные. Как я могу передать свои параметры редуктору???? Я могу сделать это в обычной потоковой передаче Hadoop, передав строку: «—reducer reducer.py arg1 arg2». Есть ли аналог для MRjob?


person Sourav Dey    schedule 01.08.2013    source источник


Ответы (2)


Как насчет того, чтобы передать ваши параметры в конфигурацию задания, а затем прочитать их с помощью get_jobconf_value?

Что-то вроде этого:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer(self, groupId, meterList):
    ...
    startDate = get_jobconf_value("my.job.settings.startdate")
    endDate = get_jobconf_value("my.job.settings.enddate")

    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, startDate, endDate)    

А затем установите параметры в коде, как вы сделали выше

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])
person T .    schedule 23.08.2013
comment
обратите внимание, что get_jobconf_value был обесценен. pythonhosted.org/mrjob/ - person dranxo; 04.08.2016

Как насчет того, чтобы передать ваши параметры в конфигурацию задания, а затем прочитать их с помощью get_jobconf_value внутри reducer_init? Таким образом, вам нужно прочитать параметры только один раз.

Что-то вроде этого:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer_init(self):
    ...
    self.startDate = get_jobconf_value("my.job.settings.startdate")
    self.endDate = get_jobconf_value("my.job.settings.enddate")

  def reducer(self, groupId, meterList):
    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, self.startDate, self.endDate)    

А затем установите параметры в коде, как вы сделали выше

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])
person JnBrymn    schedule 22.10.2013