Итак, у меня установлен Hadoop 2.7.1 на кластере из 3 машин. Я пытаюсь запустить задание mapreduce с инвертированным индексом, используя MRJob и Hadoop Streaming.
Вот моя конфигурация:
MRJob.SORT_VALUES = True
def steps(self):
JOBCONF_STEP1 = {
"mapred.map.tasks":20,
"mapred.reduce.tasks":10
}
return [MRStep(jobconf=JOBCONF_STEP1,
mapper=self.mapper,
reducer=self.reducer)
]
Однако в своем выводе я заметил, что один и тот же ключ часто используется для двух разных редюсеров. Это приводит к выводу, который выглядит следующим образом:
Key | Output
Z | 2
X | 1,2
X | 3
Z | 1
Это означает, что один редьюсер получает ключ X и значения 1 и 2, в то время как другой редуктор также получает ключ X и значение 3. Но я хочу, чтобы только один редьюсер получил ключ X и все связанные значения.
Итак, желаемый результат:
Key | Output
X | 1,2,3
Z | 1,2
Как устранить эту проблему?
Вот мой код MRJob
%%writefile invertedIndex.py
import json
import mrjob
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRinvertedIndex(MRJob):
MRJob.SORT_VALUES = True
def steps(self):
JOBCONF_STEP1 = {
"mapred.map.tasks":20,
"mapred.reduce.tasks":10
}
return [MRStep(jobconf=JOBCONF_STEP1,
mapper=self.mapper,
reducer=self.reducer)
]
def mapper(self,_,line):
key, stripe = line.split("\t")
stripe = json.loads(stripe)
for w in stripe:
yield w, key
def reducer(self,key,values):
d = [v for v in values]
yield key,d
if __name__ == '__main__':
MRinvertedIndex.run() enter code here