Я считаю, что у меня есть решение проблемы, я реализовал его в приложении после тщательного тестирования, и оно работает.
Замыкание withPool передается в созданный пул (jsr166y.ForkJoinPool) в качестве первого аргумента. Я могу взять его и сохранить в переменной (currentPool), чтобы позже использовать его в основном потоке, например:
GParsPool.withPool { pool ->
currentPool = pool
Когда генерируется исключение и оно возвращается в основной поток для обработки, я могу заставить его ждать, пока все не будет завершено, примерно так:
} catch (Exception exc) {
if (currentPool) {
while (!currentPool.isQuiescent()) {
Thread.sleep(100)
println 'waiting for threads to finish'
}
}
println 'all done'
}
IsQuiescent () кажется безопасным способом убедиться, что больше не нужно выполнять работу.
Обратите внимание, что во время тестирования я также обнаружил, что исключения, похоже, не прерывают выполнение цикла, как я первоначально думал. Если бы у меня был список из 500 и я выполнил каждыйParallel, все они работали бы независимо от того, была ли ошибка в первом проходе. Поэтому мне пришлось прервать цикл, используя currentPool.shutdownNow () внутри обработчика исключений параллельного цикла. См. Также: GPars - правильный способ преждевременного завершения параллельной коллекции < / а>
Вот полное упрощенное представление реального решения:
void example() {
jsr166y.ForkJoinPool currentPool
AtomicInteger threadCounter = new AtomicInteger(0)
AtomicInteger threadCounterEnd = new AtomicInteger(0)
AtomicReference<Exception> realException = new AtomicReference<Exception>()
try {
GParsPool.withPool { pool ->
currentPool = pool
(1..500).eachParallel {
try {
if (threadCounter.incrementAndGet() == 1) {
throw new RuntimeException('planet blew up!')
}
if (realException.get() != null) {
// We had an exception already in this eachParallel - quit early
return
}
// Do some long work
Integer counter=0
(1..1000000).each() {counter++}
// Flag if we went all the way through
threadCounterEnd.incrementAndGet()
} catch (Exception exc) {
realException.compareAndSet(null, exc)
pool.shutdownNow()
throw realException
}
}
}
} catch (Exception exc) {
// If we used pool.shutdownNow(), we need to look at the real exception.
// This is needed because pool.shutdownNow() sometimes generates a CancellationException
// which can cover up the real exception that caused us to do a shutdownNow().
if (realException.get()) {
exc = realException.get()
}
if (currentPool) {
while (!currentPool.isQuiescent()) {
Thread.sleep(100)
println 'waiting for threads to finish'
}
}
// Do further exception handling here...
exc.printStackTrace()
}
}
Возвращаясь к моему предыдущему примеру, если я генерировал исключение в первый раз на 4-ядерной машине, в очереди стояло около 5 потоков. Функция shutdownNow () отключает работу после прохождения примерно 20 потоков, поэтому наличие флажка «выйти раньше» в верхней части помогло этим 20 или около того завершить работу как можно скорее.
Просто разместите его здесь на случай, если он поможет кому-то другому, в обмен на всю помощь, которую я получил здесь. Спасибо!
person
user1373467
schedule
11.12.2012