Отказ от ответственности: я впервые использую платформу Java Fork-Join, поэтому я не на 100% уверен, что использую ее правильно. Java также не является моим основным языком программирования, так что это тоже может иметь значение.
Учитывая следующий SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
Итак, я создаю ForkJoinPool
, которому передаю некоторые задачи, которые выполняют некоторую обработку. Я заменил их на Thread.sleep()
для целей этого примера, чтобы было проще.
В моей реальной программе это очень длинный список задач, поэтому я хочу периодически выводить текущий статус на стандартный вывод. Я пытаюсь сделать это в отдельном потоке, используя запланированный TimerTask
.
Однако я заметил то, чего не ожидал: в моем примере результат выглядит примерно так:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
Это означает, что «статус-задача» никогда не выполняется.
Однако, если я изменю свой код, чтобы переместить pool.invoke(calculator);
в самом конце, он будет работать так, как ожидалось:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
Единственный вывод, который я могу сделать, это то, что ForkJoinPool::invoke()
блокирует основной поток (он возвращается только ПОСЛЕ завершения всех задач в пуле).
Я ожидал, что код в основном потоке продолжит выполняться, в то время как задачи в fork-join-pool обрабатываются асинхронно.
Мой вопрос: происходит ли это из-за того, что я неправильно использовал фреймворк? Есть ли что-то, что мне нужно исправить в моем коде?
Я заметил, что у одного из конструкторов ForkJoinPool
s есть параметр boolean asyncMode
, но, насколько я могу судить по реализации, это просто выбор между режимами выполнения FIFO_QUEUE
и LIFO_QUEUE
(не совсем уверен, что это такое):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
invoke
? - person user695022   schedule 01.10.2018execute()
? В собственных документах Oracle по этому поводу используетсяinvoke()
(docs.oracle.com/javase/ tutorial / essential / concurrency /), поэтому я не пытался его изменить. - person Radu Murzea   schedule 01.10.2018