Почему ForkJoinPool :: invoke () блокирует основной поток?

Отказ от ответственности: я впервые использую платформу Java Fork-Join, поэтому я не на 100% уверен, что использую ее правильно. Java также не является моим основным языком программирования, так что это тоже может иметь значение.


Учитывая следующий SSCCE:

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch (InterruptedException ex) {
            System.err.println(ex.getMessage());
            System.exit(2);
        }

        System.out.println("Finished computing task with delay " + delayTask);
    }
}

public final class ForkJoinBlocker
{
    public static void main(String[] args)
    {
        ForkCalculator calculator = new ForkCalculator(
            new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
        );

        ForkJoinPool pool = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors()
        );

        pool.invoke(calculator);

        //make it a daemon thread
        Timer timer = new Timer(true);

        timer.scheduleAtFixedRate(
            new TimerTask() {
                @Override
                public void run()
                {
                    System.out.println(pool.toString());
                }
            },
            100,
            2000
        );
    }
}

Итак, я создаю ForkJoinPool, которому передаю некоторые задачи, которые выполняют некоторую обработку. Я заменил их на Thread.sleep() для целей этого примера, чтобы было проще.

В моей реальной программе это очень длинный список задач, поэтому я хочу периодически выводить текущий статус на стандартный вывод. Я пытаюсь сделать это в отдельном потоке, используя запланированный TimerTask.

Однако я заметил то, чего не ожидал: в моем примере результат выглядит примерно так:

Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......

Это означает, что «статус-задача» никогда не выполняется.

Однако, если я изменю свой код, чтобы переместить pool.invoke(calculator); в самом конце, он будет работать так, как ожидалось:

java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......

Единственный вывод, который я могу сделать, это то, что ForkJoinPool::invoke() блокирует основной поток (он возвращается только ПОСЛЕ завершения всех задач в пуле).

Я ожидал, что код в основном потоке продолжит выполняться, в то время как задачи в fork-join-pool обрабатываются асинхронно.

Мой вопрос: происходит ли это из-за того, что я неправильно использовал фреймворк? Есть ли что-то, что мне нужно исправить в моем коде?

Я заметил, что у одного из конструкторов ForkJoinPools есть параметр boolean asyncMode, но, насколько я могу судить по реализации, это просто выбор между режимами выполнения FIFO_QUEUE и LIFO_QUEUE (не совсем уверен, что это такое):

public ForkJoinPool(
    int parallelism,
    ForkJoinWorkerThreadFactory factory,
    UncaughtExceptionHandler handler,
    boolean asyncMode
) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

person Radu Murzea    schedule 01.10.2018    source источник
comment
Вы читали документацию для invoke?   -  person user695022    schedule 01.10.2018
comment
@ user695022 Я сделал ... какой-то конкретный намек, к которому вы склоняетесь?   -  person Eugene    schedule 01.10.2018
comment
Вы говорите, что я должен использовать вместо этого execute()? В собственных документах Oracle по этому поводу используется invoke() (docs.oracle.com/javase/ tutorial / essential / concurrency /), поэтому я не пытался его изменить.   -  person Radu Murzea    schedule 01.10.2018
comment
Первая часть: Выполняет поставленную задачу, возвращая результат по завершении. Здесь говорится, что он не возвращается, пока задачи не будут выполнены.   -  person user695022    schedule 01.10.2018
comment
для кого-то новичка в java, это очень хороший вопрос .. определенно есть мой голос за   -  person Eugene    schedule 01.10.2018
comment
вы, кстати, могли бы принять этот правильный ответ здесь ...   -  person Eugene    schedule 02.10.2018


Ответы (1)


Обычно invoke() будет ждать завершения всей задачи перед возвратом, так что да, основной поток блокируется. После этого Timer не успевает выполнить, потому что он работает в потоке демона.

Вы можете просто использовать execute() вместо invoke(), который запускает задачу асинхронно. Затем вы можете join() на ForkJoinTask дождаться результата, в течение которого Timer будет работать:

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);

    //make it a daemon thread
Timer timer = new Timer(true);

timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println(pool.toString());
        }
    }, 100, 2000);

calculator.join(); // wait for computation
person M A    schedule 01.10.2018
comment
Хм. Итак invoke(); == execute(); join();? Я правильно это понимаю? - person Radu Murzea; 01.10.2018
comment
@RaduMurzea точно. externalSubmit(task);return task.join(); последние две строки из invoke - person Eugene; 01.10.2018
comment
вы также можете использовать pool.submit(task), который возвращает Future<T> подтип ForkJoinTask<T>, и вы можете вызвать get(), чтобы получить результат или заблокировать, если отправленная задача еще не завершена. - person Downhillski; 01.10.2018