Введение

В этом подробном руководстве мы рассмотрим MapReduce, мощную парадигму программирования для обработки больших данных. Я предоставлю пошаговое руководство по реализации игрушечной программы MapReduce на Java, включая настройку, кодирование и выполнение.

Что такое уменьшение карты?

MapReduce — это модель программирования и парадигма обработки, разработанная для эффективной обработки больших объемов данных. Он состоит из трех основных этапов: этапа сопоставления, (менее известного) этапа перемешивания и этапа сокращения.

На этапе карты данные делятся на более мелкие фрагменты и обрабатываются параллельно в распределенной вычислительной среде. Например, взять файл журнала и сопоставить его с хранилищем ключей и значений типа журнала для подсчета.

На этапе Shuffle сопоставленные данные реорганизуются таким образом, чтобы одинаковые термины были вместе. Например, взять тип журнала для подсчета карт и сгруппировать все одинаковые типы журналов в одном разделе.

Затем этап сокращения агрегирует и объединяет результаты этапа перемешивания для получения окончательного результата. Например, суммирование всех подсчетов для данного типа журнала.

Этот подход предлагает значительные преимущества для обработки больших данных, такие как масштабируемость, отказоустойчивость и параллелизм. Распределяя рабочую нагрузку между несколькими компьютерами, MapReduce обеспечивает более быструю обработку больших наборов данных по сравнению с обработкой на одном компьютере, что делает его полезным для таких задач, как анализ данных, обработка журналов и машинное обучение.

Учебник по программированию

Теперь давайте прыгнем и начнем кодировать! Во-первых, я покажу вам скрипт для создания фиктивных файлов журнала, которые мы будем использовать в нашем примере. Затем я покажу вам Map Agents и Reduce Agents, которые будут соответствовать оранжевым и желтым прямоугольникам выше соответственно. Наконец, мы объединим все это вместе, используя пул потоков для эмуляции распределенной системы.

Скрипт генерации журнала

import random

if __name__ == '__main__':
    log_levels = ["ERROR", "INFO", "WARN"]

    for i in range(1, 11):
        file_name = f"log_{i}.txt"

        with open(file_name, "w") as file:
            for _ in range(100):
                log_level = random.choice(log_levels)
                file.write(log_level + "\n")

        print(f"Generated log file: {file_name}")

Этот скрипт создает 10 файлов (от log_1.txt до log_10.txt), каждый файл будет иметь 100 строк, которые являются ERROR, INFO или WARN. Ниже приведен пример первых 10 строк одного файла.

INFO
WARN
WARN
ERROR
ERROR
WARN
INFO
ERROR
INFO
ERROR

Агент карты

package org.example;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

public class MapCallable implements Callable<Map<String, Integer>> {
    private final String filePath;
    private final Map<String, Integer> logMap;

    public MapCallable(String filePath) {
        this.filePath = filePath;
        this.logMap = new HashMap<>();
    }

    @Override
    public Map<String, Integer> call() throws Exception {
        BufferedReader reader = new BufferedReader(new FileReader(this.filePath));
        String line;
        while ((line = reader.readLine()) != null) {
            String key = line.trim();
            int valueToPut = this.logMap.getOrDefault(key, 0) + 1;
            this.logMap.put(key, valueToPut);
        }
        return this.logMap;
    }
}

Этот Map Agent будет использовать один из этих файлов журнала во время своего создания. Когда он вызывается, он читает каждую строку и поддерживает карту, сколько раз он видит ERROR, WARN или INFO, и возвращает свою карту типа журнала для возникновения, например {ERROR=25, INFO=35, WARN=40}. У нас будет 10 таких ребят, так как есть 10 лог-файлов.

Уменьшить агент

package org.example;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

public class ReduceCallable implements Callable<Map.Entry<String, Integer>> {
    private final String metricName;
    private final List<Integer> metricCounts;

    public ReduceCallable(String metricName, List<Integer> metricCounts) {
        this.metricName = metricName;
        this.metricCounts = metricCounts;
    }

    @Override
    public Map.Entry<String, Integer> call() throws Exception {
        int total = 0;
        for (Integer metricCount : metricCounts) {
            total += metricCount;
        }

        return Map.entry(this.metricName, total);
    }
}

Наш агент сокращения будет получать метрики журнала от агента сопоставления (после того, как мы перемешаем все одинаковые ключи вместе). У нас их будет три, по одному для каждого типа журнала. Например, входными данными для редуктора ERROR будут тип журнала ERROR для metricName и список размером 10 строк журнала ERROR из каждого из 10 файлов журнала для metricCounts. Затем он объединяет этот список в одно значение и отправляет обратно запись карты, например {ERROR=352}.

Все вместе

package org.example;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // MAP
        String[] filePaths = {
                "src/main/resources/logs/log_1.txt",
                "src/main/resources/logs/log_2.txt",
                "src/main/resources/logs/log_3.txt",
                "src/main/resources/logs/log_4.txt",
                "src/main/resources/logs/log_5.txt",
                "src/main/resources/logs/log_6.txt",
                "src/main/resources/logs/log_7.txt",
                "src/main/resources/logs/log_8.txt",
                "src/main/resources/logs/log_9.txt",
                "src/main/resources/logs/log_10.txt",
        };

        ExecutorService mapExecutor = Executors.newFixedThreadPool(10);
        List<Future<Map<String, Integer>>> futureList = new ArrayList<>();
        for (String filePath : filePaths) {
            MapCallable mapCallable = new MapCallable(filePath);
            Future<Map<String, Integer>> future = mapExecutor.submit(mapCallable);
            futureList.add(future);
        }
        mapExecutor.shutdown();
        mapExecutor.awaitTermination(1, TimeUnit.SECONDS);

        // SHUFFLE
        Map<String, List<Integer>> shuffledMetrics = new HashMap<>();
        for (Future<Map<String, Integer>> mapFuture : futureList) {
            Map<String, Integer> mappedMetrics = mapFuture.get();
            mappedMetrics.forEach((key, value) -> {
                List<Integer> metricCountList = shuffledMetrics.getOrDefault(key, new ArrayList<>());
                metricCountList.add(value);
                shuffledMetrics.put(key, metricCountList);
            });
        }

        // REDUCE
        ExecutorService reduceExecutor = Executors.newFixedThreadPool(3);
        List<Future<Map.Entry<String, Integer>>> futureReducerList = new ArrayList<>();
        shuffledMetrics.forEach((key, value) -> {
            ReduceCallable reduceCallable = new ReduceCallable(key, value);
            Future<Map.Entry<String, Integer>> futureReducer = reduceExecutor.submit(reduceCallable);
            futureReducerList.add(futureReducer);
        });
        reduceExecutor.shutdown();
        reduceExecutor.awaitTermination(1, TimeUnit.SECONDS);

        // GET RESULTS
        Map<String, Integer> resultMap = new HashMap<>();
        for (Future<Map.Entry<String, Integer>> futureEntry : futureReducerList) {
            Map.Entry<String, Integer> entry = futureEntry.get();
            resultMap.put(entry.getKey(), entry.getValue());
        }

        System.out.println(resultMap);
    }
}

Во-первых, у нас есть этап Map, где мы создадим наш Executor с 10 потоками — по 1 для каждого файла журнала. Для каждого файла журнала мы создаем и вызываем Map Agent для создания карт количества метрик из нашего файла журнала.

Затем мы получаем все наши карты подсчета метрик с первого этапа и перемешиваем метрическую статистику так, чтобы у нас была единая карта — shuffledMetrics — с 3 ключами: ERROR, INFO и WARN, а значения этих записей представляют собой список чисел. появления этих типов журналов в файлах.

После этого мы выделяем еще 3 потока для наших агентов Reducer для каждого типа журнала. Они будут принимать тип журнала и список вхождений в каждом из файлов и объединять список для отправки нам.

Наконец, мы объединяем записи карты Reducers в одну карту и распечатываем ее. {ОШИБКА=307, ИНФОРМАЦИЯ=339, ПРЕДУПРЕЖДЕНИЕ=354}.

Заключение

В этом руководстве был дан обзор того, как работает MapReduce, и некоторых полезных приложений для него. MapReduce используется во многих повседневных приложениях и является частым вопросом, который возникает на собеседованиях по проектированию систем. Теперь вы сможете понимать и внедрять системы MapReduce на протяжении всей своей карьеры.