Задание MapReduce для получения 10 лучших значений с использованием Python MRjob

Я хочу, чтобы эта карта уменьшила задание (код ниже), чтобы вывести 10 самых популярных продуктов. Он продолжает выдавать мне следующее сообщение об ошибке:

it = izip(iterable, count(0,-1)) # украсить TypeError: аргумент izip #1 должен поддерживать итерацию.

Я думаю, это связано с nlargest функцией, которую я пытаюсь применить.

Любые указатели?

Благодарю вас!

from mrjob.job import MRJob
from mrjob.step import MRStep
from heapq import nlargest


class MostRatedProduct(MRJob):

def steps(self):
    return [
        MRStep(mapper = self.mapper_get_ratings,
               reducer = self.reducer_count_ratings),
        MRStep(reducer = self.reducer_find_top10)
    ]


def mapper_get_ratings(self, _, line):
    (userID, itemID, rating, timestamp) = line.split(',')
    yield itemID, 1

def reducer_count_ratings(self, itemID, ratingCount):
    yield None, (sum(ratingCount), itemID)

def top_10(self, ratingPair):
    for ratingTotal, itemID in ratingPair:
        top_rated = nlargest(10, ratingTotal)
    for top_rated in ratingTotal:
        return (ratingTotal, itemID)

def reducer_find_top10(self, key, ratingPair):
    ratingTotal, itemID = self.top_10(ratingPair)
    yield ratingTotal, itemID


if __name__ == '__main__':
    MostRatedProduct.run()

person Ije    schedule 29.11.2016    source источник


Ответы (2)


Используя библиотеку mrjob, вы можете сделать то же самое в python:

#Write a Code to print the top 5 word - occurences

#Import Dependencies
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordCount(MRJob):

  def steps(self):
    return [MRStep(mapper=self.mapper,reducer=self.reducer),MRStep(reducer = self.secondreducer)]

  def mapper(self,_,lines):
    words = lines.split()
    for word in words:
      yield word.lower(),1

  def reducer(self,key,values):
    yield None,('%04d'%int(sum(values)),key)

  def secondreducer(self,key,values):
    self.alist = []
    for value in values:
      self.alist.append(value)
    self.blist = []
    for i in range(5):
      self.blist.append(max(self.alist))
      self.alist.remove(max(self.alist))
    for i in range(5):
      yield self.blist[i]

if __name__ == '__main__':
    MRWordCount.run()
person darshanc99    schedule 08.08.2019

Я не использовал mrjob, но раньше использовал MapReduce в кластере AWS для поиска лучших значений. Вот мой код, который не использует heapq. Надеюсь, вы сможете применить ту же концепцию к своему коду. Вот функция отображения

import sys, time

def Parser():
    for line in sys.stdin:
        line = line.strip('\n')
        yield line.split()


def mapper():
    counts = list(Parser())
    z = sorted(counts, key = lambda x: int(x[1]))[-10:]
    print '\n'.join(map(lambda x: '\t'.join(x), z))


if __name__=='__main__':
    mapper()

Вот код редуктора

import sys, operator, itertools

def Parser():
    for line in sys.stdin:
        yield tuple(line.strip('\n').split('\t'))

def reducer():
    for key, pairs in itertools.groupby(Parser(), operator.itemgetter(0)):
        counts = list(Parser())
        z = sorted(counts, key = lambda x: int(x[1]))[-10:]
        print '\n'.join(map(lambda x: '\t'.join(x), z))

if __name__=='__main__':
    reducer()

Я изменил его, чтобы вывести первые 10 слов. Имейте в виду, что это пример подсчета слов, в котором я анализировал текстовый документ. Я надеюсь, что это поможет в некотором роде!

person gold_cy    schedule 29.11.2016