У меня есть ряд задач, которые принимают данные, а затем по завершении обработки перемещают их в другую задачу.
Каждая задача имеет свою собственную службу Executor, инициализированную как
int workerSize = Runtime.getRuntime().availableProcessors() * 2;
executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),
new SongKongThreadFactory(threadGroup));
Задачи работают с файлами, каждый файл помещается в очередь задачи A, затем при обработке задачей A он может быть помещен в очередь задачи B или прямо в очередь задачи C.
Одна из целей этого конвейерного подхода заключается в том, что сколько бы файлов у вас ни было, приложение не использует слишком много памяти, потому что в любой момент времени обрабатываются только некоторые файлы, однако каждая задача имеет разные уровни пропускной способности, поэтому я обнаружил, что очередь задачи B растет, потому что файлы обрабатываются быстрее задачей A, чем задачей B. И эти очереди используют память
Поэтому я изменил executor servide defn, чтобы указать размер очереди amx следующим образом:
executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(100),
new SongKongThreadFactory(threadGroup));
Моя идея заключалась в том, что если задача A попытается отправить элемент 101, она просто подождет, пока не появится место, поэтому обработка задачи A временно приостановится, а использование памяти уменьшится. Но вместо этого задача просто отклоняется с RejectedExecutionException .Ive, поскольку проверены все доступные политики ThreadExecutor, и ни одна из них не позволяет вызывающему процессу ждать.
Поэтому я думаю, что я должен подходить к этому неправильно, как мне это сделать?
В настоящее время я использую Java 7 и ожидаю перехода на Java 8 после выпуска.