Java ExecutorService — масштабирование

Я пытаюсь написать программу на Java, используя ExecutorService и ее функцию invokeAll. Мой вопрос: решает ли функция invokeAll задачи одновременно? В смысле, если у меня два процессора, будет два рабочих одновременно? Потому что я не могу правильно его масштабировать. Если я поставлю newFixedThreadPool(2) или 1, решение задачи займет столько же времени.

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map — это класс, который реализует Callable, а wp — это вектор частичных решений, класс, который содержит некоторую информацию в разное время.

Почему не масштабируется? В чем может быть проблема?

Это код для PartialSolution:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

Это код для карты:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

Итак, в Map я беру каждую строку из фрагмента и ищу для каждого выражения количество появлений, а также сохраняю количество строк. После обработки всего фрагмента в том же PartialSolution я сохраняю информацию в хэш-карте и возвращаю новое PartialSolution. На следующем шаге я объединяю PartialSolutions с одним и тем же именем файла и добавляю их в вызываемый класс Reduce, который аналогичен map, разница в том, что он выполняет другие операции, но также возвращает PartialSolution.

Это код для запуска задач карты:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

В задаче я создаю задачу типа Map и в списке получаю их. Я не знаю, как читать дамп потока JVM. Я надеюсь, что это достаточно хорошо, что информация, которую я дал вам. Я работаю в NetBeans 7.0.1, если это поможет.

Спасибо, Алекс.


person Stanciu Alexandru-Marian    schedule 29.11.2011    source источник
comment
Сколько у вас задач? И что они делают? Много ли операций ввода-вывода?   -  person Thilo    schedule 29.11.2011
comment
Мои задачи - это те вызываемые классы, которые используют PartialSolution, которые имеют некоторый текст и подсчитывают, сколько раз слово появляется в этом тексте и строках. PartialSolution на самом деле является частью текста, и я хочу получить эту информацию для каждой части, а затем объединить их с другим вызываемым классом, называемым Reduce. Я хочу обрабатывать эти части одновременно. в зависимости от количества процессоров у меня есть. Ввод/вывод будет в конце, когда я объединю все задачи и, скажем, из 10 частей, и останется только одна со всей информацией об этом документе. Google использует MapReduce.   -  person Stanciu Alexandru-Marian    schedule 29.11.2011
comment
Что я хочу знать, так это то, будет ли метод invokeAll, если я создал ExcutorService с 10 потоками, решать 10 задач одновременно или будет решать одну за раз? В Map у меня есть конструктор, и я реализую функцию call(), которая возвращает другое PartialSolution, но на этот раз с правильной информацией. И еще вопрос, если я скажу list.get(i).get(), это вернет PartialSolution после того, как оно было решено правильно? Я действительно не понимаю, почему время не улучшается, если я использую 2 потока вместо 1. Почему он не масштабируется?   -  person Stanciu Alexandru-Marian    schedule 29.11.2011
comment
Вы могли бы использовать тег homework. (а также надеюсь, что никто не скопирует ваш код)   -  person Iulius Curt    schedule 14.12.2011


Ответы (3)


Что я хочу знать, так это то, будет ли метод invokeAll, если я создал ExcutorService с 10 потоками, решать 10 задач одновременно или будет решать одну за раз?

Если вы отправляете десять задач в ExecutorService с десятью потоками, все они будут выполняться одновременно. Могут ли они действовать полностью параллельно и независимо друг от друга, зависит от того, что они делают. Но у каждого из них будет своя тема.

И еще вопрос, если я скажу list.get(i).get(), это вернет PartialSolution после того, как оно будет решено?

Да, он будет блокироваться до тех пор, пока не будет выполнено вычисление (если оно еще не выполнено) и не вернет его результат.

Я действительно не понимаю, почему время не улучшается, если я использую 2 потока вместо 1.

Нам нужно увидеть больше кода. Они синхронизируются с некоторыми общими данными? Сколько времени занимают эти задачи? Если они очень короткие, вы можете не заметить никакой разницы. Если они занимают больше времени, просмотрите дамп потока JVM, чтобы убедиться, что все они выполняются.

person Thilo    schedule 29.11.2011
comment
+1. Однако есть одна ошибка: invokeAll возвращает список завершенных фьючерсов. Другими словами: он возвращается только после завершения всех задач. - person JB Nizet; 29.11.2011

Если вы создадите пул потоков с двумя потоками, то две задачи будут выполняться одновременно.

Я вижу две вещи, из-за которых два потока могут занимать столько же времени, сколько один поток.

Если только одна из ваших задач Map занимает большую часть вашего времени, то дополнительные потоки не заставят эту задачу работать быстрее. Он не может закончиться быстрее, чем самая медленная работа.

Другая возможность заключается в том, что ваша задача карты часто читает из общего вектора. Это может вызвать достаточный конфликт, чтобы свести на нет выигрыш от наличия двух потоков.

Вы должны поднять это в jvisualvm, чтобы увидеть, что делает каждый поток.

person Michael Krussel    schedule 29.11.2011
comment
Я установил VisualVM, но я не знаю, как его использовать, я имею в виду, что я не знаю, что смотреть, как читать данные. Некоторая помощь, пожалуйста. - person Stanciu Alexandru-Marian; 30.11.2011
comment
Я сделал следующие шаги: Profiler -> CPU -> щелкните правой кнопкой мыши, а затем Thread Dump... но я ничего не понимаю. - person Stanciu Alexandru-Marian; 30.11.2011
comment
@StanciuAlexandru-Marian Я бы порекомендовал назвать ваши потоки чем-то осмысленным, используя ThreadFactory. Затем найдите темы в списке тем. Затем проверьте, как изменяется состояние каждого потока во время выполнения кода. Это даст вам представление о том, сколько работы выполняет каждый поток. Если один поток ожидает, вы можете сделать дамп потока, чтобы увидеть, что он ожидает. - person Michael Krussel; 30.11.2011
comment
Я решил свою проблему. Виноват был мой компьютер. Хотя у меня Intel 2 Duo Core похоже работает очень плохо. Я не знаю, почему, когда это произошло. Я протестировал кластер своего факультета, и он работает очень быстро и масштабируется. Спасибо за всю вашу помощь, и я надеюсь, что я не причинил вам слишком много хлопот. - person Stanciu Alexandru-Marian; 30.11.2011

Java 8 представила еще один API в Executors - newWorkStealingPool для создания пула кражи работ. Вам не нужно создавать RecursiveTask и RecursiveAction, но вы можете использовать ForkJoinPool.

public static ExecutorService newWorkStealingPool()

Создает пул потоков для перехвата работы, используя все доступные процессоры в качестве целевого уровня параллелизма.

По умолчанию в качестве параметра параллелизма принимается количество ядер ЦП. Если у вас есть основные процессоры, вы можете иметь 8 потоков для обработки очереди рабочих задач.

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

Либо ExecutorService, либо ForkJoinPool или ThreadPoolExecutor производительность будет хорошей, если у вас нет общих данных и совместно используемая блокировка (синхронизация) и межпотоковое взаимодействие. Если бы все задачи были независимы друг от друга в очереди задач, производительность повысилась бы.

ThreadPoolExecutor конструктор для настройки и управления рабочим процессом задач:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

Посмотрите на связанные вопросы SE:

Как правильно использовать Java Executor?

Java Fork/Join vs ExecutorService - когда использовать какой?

person Ravindra babu    schedule 30.01.2016