Задание MapReduce (написанное на Python) медленно работает на EMR

Я пытаюсь написать задание MapReduce, используя пакет MRJob python. Задание обрабатывает ~ 36 000 файлов, хранящихся в S3. Размер каждого файла составляет ~ 2 МБ. Когда я запускаю задание локально (загружая ведро S3 на свой компьютер), его выполнение занимает примерно 1 час. Однако когда я пытаюсь запустить его на EMR, это занимает намного больше времени (я остановил его на 8 часах, и он был завершен на 10% в картографе). Я прикрепил код для моих mapper_init и mapper ниже. Кто-нибудь знает, что может вызвать такую ​​проблему? Кто-нибудь знает как исправить? Я также должен отметить, что когда я ограничиваю ввод выборкой из 100 файлов, он работает нормально.

def mapper_init(self):
    """
    Set class variables that will be useful to our mapper:
        filename: the path and filename to the current recipe file
        previous_line: The line previously parsed. We need this because the
          ingredient name is in the line after the tag
    """

    #self.filename = os.environ["map_input_file"]  # Not currently used
    self.previous_line = "None yet"
    # Determining if an item is in a list is O(n) while determining if an
    #  item is in a set is O(1)
    self.stopwords = set(stopwords.words('english'))
    self.stopwords = set(self.stopwords_list)


def mapper(self, _, line):
    """
    Takes a line from an html file and yields ingredient words from it

    Given a line of input from an html file, we check to see if it
    contains the identifier that it is an ingredient. Due to the
    formatting of our html files from allrecipes.com, the ingredient name
    is actually found on the following line. Therefore, we save the
    current line so that it can be referenced in the next pass of the
    function to determine if we are on an ingredient line.

    :param line: a line of text from the html file as a str
    :yield: a tuple containing each word in the ingredient as well as a
        counter for each word. The counter is not currently being used,
        but is left in for future development. e.g. "chicken breast" would
        yield "chicken" and "breast"
    """

    # TODO is there a better way to get the tag?
    if re.search(r'span class="ingredient-name" id="lblIngName"',
                 self.previous_line):
        self.previous_line = line
        line = self.process_text(line)
        line_list = set(line.split())
        for word in line_list:
            if word not in self.stopwords:
                yield (word, 1)
    else:
        self.previous_line = line
    yield ('', 0)

person DickJ    schedule 22.02.2015    source источник


Ответы (1)


Проблема в том, что у вас больше маленьких файлов. Добавьте шаг начальной загрузки с помощью s3distcp для копирования файлов в EMR. при использовании s3distcp попробуйте объединить небольшие файлы в файл размером ~ 128 МБ.

Hadoop не годится для работы с большим количеством маленьких файлов.

Поскольку вы вручную загружаете файлы на свой компьютер и запускаете, значит, он работает быстрее.

После копирования файла в EMR с помощью S3distCP используйте файл из HDFS.

person Sandesh Deshmane    schedule 23.02.2015
comment
как я могу использовать hdfs вместо s3 в amazon emr? - person member555; 22.08.2015