Как с помощью GPars узнать, что все потоки завершены при возникновении исключения?

В случае, когда поток генерирует исключение, как я могу дождаться завершения всех потоков, которые не генерировали исключение (чтобы пользователь не запускался снова, пока все не остановилось)?

Я использую GPars несколькими способами, поэтому мне нужна стратегия для каждого (параллельные коллекции, асинхронные замыкания и вилка / соединение). Исключения не скрываются, они хорошо обрабатываются с помощью обещаний, getChildrenResults и т. Д., Так что это не проблема (благодаря ответам Вацлава Пеха). Мне просто нужно убедиться, что основной поток ждет, пока все, что все еще выполняется, не завершится или не будет остановлено иным образом.

Например, при использовании параллельных коллекций некоторые потоки продолжают выполняться, а некоторые никогда не запускаются после исключения. Так что нелегко сказать, сколько еще осталось ждать, или, возможно, их схватить.

Я предполагаю, что, возможно, есть способ работать с пулом потоков (в данном случае GParsPool). Какие-либо предложения?

Спасибо!


person user1373467    schedule 07.12.2012    source источник


Ответы (2)


Я считаю, что у меня есть решение проблемы, я реализовал его в приложении после тщательного тестирования, и оно работает.

Замыкание withPool передается в созданный пул (jsr166y.ForkJoinPool) в качестве первого аргумента. Я могу взять его и сохранить в переменной (currentPool), чтобы позже использовать его в основном потоке, например:

    GParsPool.withPool { pool ->
        currentPool = pool

Когда генерируется исключение и оно возвращается в основной поток для обработки, я могу заставить его ждать, пока все не будет завершено, примерно так:

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

IsQuiescent () кажется безопасным способом убедиться, что больше не нужно выполнять работу.

Обратите внимание, что во время тестирования я также обнаружил, что исключения, похоже, не прерывают выполнение цикла, как я первоначально думал. Если бы у меня был список из 500 и я выполнил каждыйParallel, все они работали бы независимо от того, была ли ошибка в первом проходе. Поэтому мне пришлось прервать цикл, используя currentPool.shutdownNow () внутри обработчика исключений параллельного цикла. См. Также: GPars - правильный способ преждевременного завершения параллельной коллекции < / а>

Вот полное упрощенное представление реального решения:

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

Возвращаясь к моему предыдущему примеру, если я генерировал исключение в первый раз на 4-ядерной машине, в очереди стояло около 5 потоков. Функция shutdownNow () отключает работу после прохождения примерно 20 потоков, поэтому наличие флажка «выйти раньше» в верхней части помогло этим 20 или около того завершить работу как можно скорее.

Просто разместите его здесь на случай, если он поможет кому-то другому, в обмен на всю помощь, которую я получил здесь. Спасибо!

person user1373467    schedule 11.12.2012

Я считаю, что вам нужно перехватить исключение, а затем вернуть что-то отличное от ожидаемого результата (например, String или null, если вы ожидаете число, например), т.е.

@Grab('org.codehaus.gpars:gpars:0.12')
import static groovyx.gpars.GParsPool.*

def results = withPool {
  [1,2,3].collectParallel {
    try {
      if( it % 2 == 0 ) {
        throw new RuntimeException( '2 fails' )
      }
      else {
        Thread.sleep( 2000 )
        it
      }
    }
    catch( e ) { e.class.name }
  }
}
person tim_yates    schedule 07.12.2012
comment
в моем примере результаты будут содержать целые числа для прошедших потоков и строки для тех, которые вызвали исключение. все потоки будут завершены ... - person tim_yates; 08.12.2012
comment
Хорошо, я вижу, где отключение. Допустим, у вас есть 1000 элементов в коллекции, которые нужно перебрать. Если генерируется исключение, обычно кажется, что группа прерывается - она ​​перестает запускать потоки, чего я хочу, и управление возвращается основному потоку. Так что не каждый поток запускается. Однако в основном потоке все еще есть потоки, запущенные до исключения, которые еще не завершили очистку. Имеет ли это смысл? - person user1373467; 10.12.2012
comment
Теперь ваше предложение дает мне одну идею - я мог бы подсчитать, сколько на самом деле уволили, и отследить, сколько завершено. Я просто надеялся на что-нибудь попроще. Но я ценю это, это было полезно +1 - person user1373467; 10.12.2012