Как я могу временно приостановить вызов задачи, когда очередь исполнителя заполнена

У меня есть ряд задач, которые принимают данные, а затем по завершении обработки перемещают их в другую задачу.

Каждая задача имеет свою собственную службу Executor, инициализированную как

 int workerSize = Runtime.getRuntime().availableProcessors() * 2;
 executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),
                                          new SongKongThreadFactory(threadGroup));

Задачи работают с файлами, каждый файл помещается в очередь задачи A, затем при обработке задачей A он может быть помещен в очередь задачи B или прямо в очередь задачи C.

Одна из целей этого конвейерного подхода заключается в том, что сколько бы файлов у вас ни было, приложение не использует слишком много памяти, потому что в любой момент времени обрабатываются только некоторые файлы, однако каждая задача имеет разные уровни пропускной способности, поэтому я обнаружил, что очередь задачи B растет, потому что файлы обрабатываются быстрее задачей A, чем задачей B. И эти очереди используют память

Поэтому я изменил executor servide defn, чтобы указать размер очереди amx следующим образом:

executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(100),
                                              new SongKongThreadFactory(threadGroup));

Моя идея заключалась в том, что если задача A попытается отправить элемент 101, она просто подождет, пока не появится место, поэтому обработка задачи A временно приостановится, а использование памяти уменьшится. Но вместо этого задача просто отклоняется с RejectedExecutionException .Ive, поскольку проверены все доступные политики ThreadExecutor, и ни одна из них не позволяет вызывающему процессу ждать.

Поэтому я думаю, что я должен подходить к этому неправильно, как мне это сделать?

В настоящее время я использую Java 7 и ожидаю перехода на Java 8 после выпуска.


person Paul Taylor    schedule 22.02.2014    source источник
comment
Помогает ли это: stackoverflow.com/questions/2001086/   -  person Harald    schedule 22.02.2014
comment
@ Гарольд, да, спасибо, я думаю, что с помощью CallerRunsPolicy, поэтому вызывающий абонент ничего не может сделать, потому что занят обработкой неудачной отправки, сам должен работать, не понял аргументацию CallerRunsPolicy, когда я смотрел на это раньше. Единственное, что меня беспокоит, если задача, запускаемая вызывающим абонентом, по какой-то причине зависает и предотвращает отправку дополнительных задач.   -  person Paul Taylor    schedule 22.02.2014


Ответы (1)


Чтобы временно приостановить звонящего... просто не возвращайтесь к звонящему. Подождите локально, пока очередь не станет доступной, и вы сможете выполнить задачу. На самом деле, самый эффективный способ сделать это — вызвать функцию wait(). См., среди прочего, Разницу между wait() и sleep()

person keshlam    schedule 22.02.2014
comment
но я не знаю, когда очередь снова будет доступна, чего я жду - person Paul Taylor; 22.02.2014
comment
Измените очередь, чтобы она уведомляла потоки ожидания(), когда становится доступной. Или вы можете заснуть и повторить попытку, но тогда вы будете ждать дольше, чем нужно, и/или потратите несколько циклов на перепроверку того, освободилось ли место. ИЛИ... используйте одну из потокобезопасных реализаций Queue, уже предоставленных Java, которые инкапсулируют поведение ожидания до освобождения места. - person keshlam; 22.02.2014
comment
Правильно, я просто использую CallerRunsPolicy, несколько проще. - person Paul Taylor; 22.02.2014