Java: синхронизация потока ExecutorService с CountDownLatch вызывает мертвую блокировку?

Я написал игру жизни для практики программирования. Есть 3 различных реализации генератора. Первый: один основной поток + N вспомогательных потоков, Второй: SwingWorker + N вспомогательных потоков, Третий: SwingWorker + ExecutorService. N - количество доступных процессоров или определено пользователем. Первые две реализации работают нормально, с одним или несколькими потоками. Реализация ExecutorServise отлично работает с одним потоком, но блокируется с более чем одним. Я все перепробовал, но не могу найти решение.

Вот код реализации тонкой работы (второй):

    package example.generator;

    import javax.swing.SwingWorker;

    /**
     * AbstractGenerator implementation 2: SwingWorker + sub threads.
     * 
     * @author Dima
     */
    public final class WorldGenerator2 extends AbstractGenerator {


        /**
         * Constructor.
         * @param gamePanel The game panel
         */
        public WorldGenerator2() {
            super();
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startGenerationProcess()
         */
        @Override
        protected void startGenerationProcess() {
            final SwingWorker<Void, Void> worker = this.createWorker();
            worker.execute();
        }




        /**
         * Creates a swing worker for the generation process.
         * @return The swing worker
         */
        private SwingWorker<Void, Void> createWorker() {
            return new SwingWorker<Void, Void>() {

                @Override
                protected Void doInBackground() throws InterruptedException {
                    WorldGenerator2.this.generationProcessing();
                    return null;
                }
            };
        }





        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startFirstStep()
         */
        @Override
        public void startFirstStep() throws InterruptedException {
            this.getQueue().addAll(this.getLivingCells());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                final Thread thread = new Thread() {
                    @Override
                    public void run() {
                        WorldGenerator2.this.fistStepProcessing();
                    }
                };
                thread.start();
                thread.join();
            }
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startSecondStep()
         */
        @Override
        protected void startSecondStep() throws InterruptedException {
            this.getQueue().addAll(this.getCellsToCheck());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                final Thread thread = new Thread() {

                    @Override
                    public void run() {
                        WorldGenerator2.this.secondStepProcessing();
                    }
                };
                thread.start();
                thread.join();
            }
        }

    }

Вот код неработающей реализации со службой исполнителя:

    package example.generator;

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import javax.swing.SwingWorker;

    /**
     * AbstractGenerator implementation 3: SwingWorker + ExecutorService.
     * 
     * @author Dima
     */
    public final class WorldGenerator3 extends AbstractGenerator {


        private CountDownLatch  countDownLatch;
        private ExecutorService executor;


        /**
         * Constructor.
         * @param gamePanel The game panel
         */
        public WorldGenerator3() {
            super();
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startGenerationProcess()
         */
        @Override
        protected void startGenerationProcess() {
            this.executor = Executors.newFixedThreadPool(this.getCoresToUse());
            final SwingWorker<Void, Void> worker = this.createWorker();
            worker.execute();
        }




        /**
         * Creates a swing worker for the generation process.
         * @return The swing worker
         */
        private SwingWorker<Void, Void> createWorker() {
            return new SwingWorker<Void, Void>() {

                @Override
                protected Void doInBackground() throws InterruptedException {
                    WorldGenerator3.this.generationProcessing();
                    return null;
                }
            };
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startFirstStep()
         */
        @Override
        public void startFirstStep() throws InterruptedException {
            this.getQueue().addAll(this.getLivingCells());
            this.countDownLatch = new CountDownLatch(this.getCoresToUse());
            for (int i = 0; i < this.getCoresToUse(); i++) {    
                this.executor.execute(new Runnable() {  
                    @Override
                    public void run() {
                        WorldGenerator3.this.fistStepProcessing();
                        WorldGenerator3.this.countDownLatch.countDown();
                    }
                });
            }
            this.countDownLatch.await();

        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startSecondStep()
         */
        @Override
        protected void startSecondStep() throws InterruptedException {
            this.getQueue().addAll(this.getCellsToCheck());
            this.countDownLatch = new CountDownLatch(this.getCoresToUse());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                this.executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        WorldGenerator3.this.secondStepProcessing();
                        WorldGenerator3.this.countDownLatch.countDown();
                    }
                });
            }
            this.countDownLatch.await();

        }
    }

Здесь вы можете скачать образец моего приложения с небольшой пусковой установкой. он выводит на консоль только результат итерации: Link


Теперь мой код выглядит так:

/* (non-Javadoc)
 * @see main.generator.AbstractGenerator#startFirstStep()
 */
@Override
public void startFirstStep() throws InterruptedException {

    this.getQueue().addAll(this.getLivingCells());      

    final ArrayList<Callable<Void>> list = new ArrayList<Callable<Void>>(this.getCoresToUse());

    for (int i = 0; i < this.getCoresToUse(); i++) {

        list.add(new Callable<Void>() {

                @Override
                public Void call() throws Exception {
                    WorldGenerator3.this.fistStepProcessing();
                    return null;
                }
            }
        );          
    }

    this.executor.invokeAll(list);
}

Но здесь снова та же проблема. Если я запустил его с одним ядром (потоком), проблем не возникнет. Если я установил количество ядер больше одного, он блокируется. В моем первом вопросе есть ссылка на пример, который вы можете запустить (в eclipse). Возможно, я что-то упустил из предыдущего кода.


person Dima82    schedule 23.04.2012    source источник


Ответы (2)


Я нахожу ваше использование возможностей Executors немного странным ...

Т.е. идея состоит в том, чтобы иметь Executor с пулом потоков, размер которого обычно зависит от количества ядер, поддерживаемых вашим процессором. Затем вы отправляете любое количество параллельных задач исполнителю, позволяя ему решать, что выполнять, когда и в каком доступном потоке из его пула.

Что касается CountDownLatch ... Почему бы не использовать ExecutorService.invokeAll? Этот метод будет блокироваться до тех пор, пока все отправленные задачи не будут выполнены или не истечет время ожидания. Таким образом, он будет подсчитывать оставшуюся работу от вашего имени. Или CompletionService, который " отделяет производство новых асинхронных задач от потребления результатов выполненных задач, «если вы хотите использовать результат задачи, как только он станет доступным, то есть не ждать, пока все задачи будут завершены первыми.

Что-то вроде

    private static final int WORKER_THREAD_COUNT_DEFAULT = Runtime.getRuntime().availableProcessors() * 2;

    ExecutorService executor = Executors.newFixedThreadPool(WORKER_THREAD_COUNT);

    // your tasks may or may not return result so consuming invokeAll return value may not be necessary in your case 
    List<Future<T>> futuresResult = executor.invokeAll(tasksToRunInParallel, EXECUTE_TIMEOUT,
                TimeUnit.SECONDS);
person Svilen    schedule 24.04.2012
comment
В моем первом примере кода (вторая реализация) есть потоки, которые я запускаю и к которым присоединяюсь. Это прекрасно работает. Но это очень мало обрабатывает каждый поток, поэтому много завершающихся и умирающих потоков, поэтому сборщику мусора приходится много работать. Мне этого не хотелось, поэтому я решил использовать ExecutorService, потому что здесь повторно используются потоки из пула. И я получаю чистый результат от Будущего. Я не занимаюсь нетто-фьючерсами (это снова много объектов для сборщика мусора). Как присоединиться к потокам без Future-объектов? - person Dima82; 27.04.2012
comment
Вы сделали какие-то измерения производительности, которые показывают длинные паузы вашего приложения, связанные с сборкой мусора? Я бы не стал слишком беспокоиться о сборке фьючерсов, если вы все равно не будете на них ссылаться (что может привести к OutOfMemoryError). В наши дни JVM работает очень быстро, если вам не нужна производительность, близкая к скорости реального времени, когда паузы сборки мусора действительно имеют значение, я не думаю, что у вас возникнет проблема с этим. GC также довольно настраиваемый, поэтому вы можете изменить его поведение в соответствии с потребностями вашего приложения, если паузы GC являются проблемой. Но всегда сначала измеряйте, прежде чем проводить какие-либо оптимизации. - person Svilen; 27.04.2012

Во всех вариантах вы выполняете потоки последовательно, а не параллельно, потому что вы join и await внутри цикла for. Это означает, что цикл for не может перейти к следующей итерации, пока только что запущенный поток не будет завершен. Это означает, что в любой момент времени работает только один поток - либо основной поток, либо один поток, созданный в текущей итерации цикла. Если вы хотите присоединиться к нескольким потокам, вы должны собрать ссылки на них, а затем, вне цикла, в котором вы их все начали, войти в другой цикл, в котором вы присоединяетесь к каждому из них.

Что касается использования CountDownLatch в варианте Executors, то, что было сказано для потоков, здесь касается защелки: не используйте экземпляр var; используйте локальный список, который собирает все защелки и ожидает их в отдельном цикле.

Но на самом деле вам не следует использовать CountDownLatch в первую очередь: вы должны поместить все ваши параллельные задачи в список Callable и вызвать ExecutorService.invokeAll с ним. Он будет автоматически блокироваться, пока все задачи не будут выполнены.

person Marko Topolnik    schedule 23.04.2012
comment
У меня есть основной цикл, который повторяется, пока я не остановлю этот процесс. В этом основном цикле нужно выполнить два шага. Оба шага представляют собой работу над множеством однотипных задач. Так что я могу поделиться этой работой (много потоков). И только если последний поток первого шага завершен, нужно начинать второй шаг. Второй шаг - это также работа с множеством однотипных задач, поэтому его также можно использовать совместно. Если последний поток второго шага завершился, можно запустить следующую итерацию, снова выполнив первый шаг. - person Dima82; 27.04.2012
comment
Да вот как я понял из вашего вопроса. Мой совет применим именно к этому случаю. См. Обновленный ответ, чтобы узнать, как это сделать еще проще. - person Marko Topolnik; 27.04.2012