Какой по совместительству баг есть в этой программе?

У меня есть одна программа со странным одновременным багом.

Что делает эта программа:

  1. Выполнять цикл событий каждый EVENT_LOOP_PAUSE_DURATION_IN_MS.
  2. Для каждой заданной задачи выполняется процессор TaskProcessor
  3. Каждый 500 ms печатает размер очереди моего исполнителя.

Я хочу иметь не более одной задачи в очереди на taskId. Итак, когда я добавляю задачу в очередь, я проверяю, существовали ли уже задачи или нет. Если задачи нет, добавляю. По окончании обработки задачи удаляю задачу с activeTasks карты.

Если вы запустите программу, то увидите следующий вывод:

ERROR: 50
ERROR: 70
ERROR: 80
ERROR: 90
ERROR: 110
ERROR: 120
ERROR: 120
ERROR: 140

Итак, есть ошибка. Я не знаю почему, но размер очереди пула потоков бесконечно увеличивается.

Вы можете видеть, что я удаляю активные задачи в 2 пунктах программы:

  1. В finally блоке TaskProcessor, когда задача обработана.
  2. Я удаляю устаревшие задачи в цикле событий.

Итак, если я уберу код, удаляющий задачи в точке (2), то баг исчезнет. Я не понимаю такого поведения.

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Program {

    private static final int NUMBER_OF_TASKS = 40;
    private static final int NUMBER_OF_THREADS = 10;
    private static final long EVENT_LOOP_PAUSE_DURATION_IN_MS = 40L;

    class QueueSizePrinter extends Thread {

        private final LinkedBlockingQueue<Runnable> workQueue;

        public QueueSizePrinter(LinkedBlockingQueue<Runnable> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (true) {
                int qSize = workQueue.size();
                if (qSize > NUMBER_OF_TASKS) {
                    System.out.println("ERROR: " + qSize);
                }

                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class TaskProcessor implements Runnable {
        private final String currentTaskId;
        private final ConcurrentHashMap<String, Long> activeTasks;

        public TaskProcessor(String currentTaskId, ConcurrentHashMap<String, Long> activeTasks) {
            this.currentTaskId = currentTaskId;
            this.activeTasks = activeTasks;
        }

        @Override
        public void run() {
            try {
                // emulate of useful work
                Thread.sleep(300L);
            } catch (Exception e) {
                System.out.println("error: " + e.toString());
            } finally {
                activeTasks.remove(currentTaskId); // (1)
            }
        }
    }

    public void program() {

        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ExecutorService executor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, 0L, TimeUnit.MILLISECONDS, workQueue);

        Set<String> initialTasks = ConcurrentHashMap.newKeySet();
        for (int currentTaskIndex = 0; currentTaskIndex < NUMBER_OF_TASKS; currentTaskIndex++) {
            initialTasks.add(String.valueOf(currentTaskIndex));
        }

        new QueueSizePrinter(workQueue).start();

        ConcurrentHashMap<String, Long> activeTasks = new ConcurrentHashMap<>();

        while (true) {

            initialTasks.forEach((currentTaskId) -> {
                if (!activeTasks.containsKey(currentTaskId)) {
                    activeTasks.put(currentTaskId, System.currentTimeMillis());

                    executor.submit(new TaskProcessor(currentTaskId, activeTasks));
                }
            });

            // (2)
            activeTasks.entrySet().removeIf(entry -> {
                boolean hasDelete = System.currentTimeMillis() - entry.getValue() > 1000;
                if (hasDelete) {
                    //System.out.println("DELETE id=" + entry.getKey());
                }
                return hasDelete;
            });

            try {
                Thread.sleep(EVENT_LOOP_PAUSE_DURATION_IN_MS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Program main = new Program();
        main.program();
    }
}



Ответы (1)


Проблема в точке (2). Вы удаляете устаревшие задачи с карты activeTasks. Но они по-прежнему отправляются в ExecutorService. Поскольку Вы удалили его с карты, когда цикл while выполняет другой цикл, та же задача будет повторно отправлена ​​​​в ExecutorService. Это приводит к увеличению числа задач.

person miskender    schedule 24.02.2019
comment
Но если я удаляю код в точке (2), очередь не растет. Я не могу понять, почему когда у меня два разных удаления, это баг. Но когда у меня есть одно удаление, нет ошибки. - person Max; 24.02.2019
comment
в (1) вы удаляете задачу, которая выполняется в Executorservice, чтобы ее можно было добавить снова. at (2) Вы удаляете задачу, которая все еще находится в ExecutorQueue, поэтому ее не следует добавлять снова. Но в цикле while вы добавляете его снова. - person miskender; 24.02.2019