Завершить существующий пул, когда вся работа будет выполнена

Хорошо, новичок в gpars, поэтому, пожалуйста, простите меня, если у этого есть очевидный ответ.

Вот мой сценарий. В настоящее время у нас есть часть нашего кода, заключенная в блок Thread.start {}. Это делается для того, чтобы отправлять сообщения в очередь сообщений в фоновом режиме и не блокировать запрос пользователя. Проблема, с которой мы недавно столкнулись, заключается в том, что для больших блоков работы пользователи могут выполнить другое действие, которое приведет к повторному выполнению этого блока. Поскольку он многопоточный, второй пакет сообщений может быть отправлен раньше, чем первый, что приведет к повреждению данных.

Я хотел бы изменить этот процесс, чтобы он работал как поток очереди с gpars. Я видел примеры создания пулов, таких как

def pool = GParsPool.createPool()

or

def pool = new ForkJoinPool()

а затем использовать пул как

GParsPool.withExistingPool(pool) {
    ...
}

Похоже, что это будет учитывать случай, когда, если пользователь снова выполнит действие, я смогу повторно использовать созданный пул, и действия не будут выполняться не по порядку, если у меня есть размер пула, равный единице.

Мой вопрос: это лучший способ сделать это с gpars? И кроме того, как я узнаю, что пул закончил всю свою работу? Заканчивается ли он, когда вся работа закончена? Если да, то есть ли метод, который можно использовать для проверки того, закончился ли/завершен пул, чтобы узнать, что мне нужен новый?

Любая помощь будет оценена по достоинству.


person Taplar    schedule 11.05.2015    source источник


Ответы (2)


Нет, явно созданные пулы не завершаются сами по себе. Вы должны вызвать для них shutdown() явно.

Однако использование команды withPool() {} гарантирует, что пул будет уничтожен после завершения блока кода.

person Vaclav Pech    schedule 12.05.2015
comment
Я запутался. Похоже, что выполнение блоков withPool и withExistingPool. Я сделал несколько тестов на спок и логику после закрытия, выполненного после завершения пула. Это не то, чего я хочу. Я не хочу, чтобы пул блокировал выполнение. - person Taplar; 12.05.2015
comment
Тогда, возможно, вам может понадобиться другая абстракция. В общем, задачи и обещания потока данных дают вам больше гибкости. - person Vaclav Pech; 13.05.2015
comment
Я считаю, что у нас есть рабочее решение с использованием актеров. Я опубликую наше окончательное решение, как только мы будем уверены, что оно нас устраивает :) - person Taplar; 13.05.2015

Вот текущее решение, которое у нас есть для нашей проблемы. Следует отметить, что мы пошли по этому пути из-за наших требований

  • Работа сгруппирована по некоторому контексту
  • Работа в заданном контексте упорядочена
  • Работа в заданном контексте синхронна
  • Дополнительная работа для контекста должна выполняться после предыдущей работы.
  • Работа не должна блокировать запрос пользователя
  • Контексты асинхронны между собой
  • Как только работа над контекстом завершена, контекст должен очиститься после себя.

Учитывая вышесказанное, мы реализовали следующее:

class AsyncService {
    def queueContexts


    def AsyncService() {
        queueContexts = new QueueContexts()
    }

    def queue(contextString, closure) {
        queueContexts.retrieveContextWithWork(contextString, true).send(closure)
    }

    class QueueContexts {
        def contextMap = [:]

        def synchronized retrieveContextWithWork(contextString, incrementWork) {
            def context = contextMap[contextString]

            if (context) {
                if (!context.hasWork(incrementWork)) {
                    contextMap.remove(contextString)
                    context.terminate()
                }
            } else {
                def queueContexts = this
                contextMap[contextString] = new QueueContext({->
                    queueContexts.retrieveContextWithWork(contextString, false)
                })
            }

            contextMap[contextString]
        }

        class QueueContext {
            def workCount
            def actor

            def QueueContext(finishClosure) {
                workCount = 1
                actor = Actors.actor {
                    loop {
                        react { closure ->
                            try {
                                closure()
                            } catch (Throwable th) {
                                log.error("Uncaught exception in async queue context", th)
                            }

                            finishClosure()
                        }
                    }
                }
            }

            def send(closure) {
                actor.send(closure)
            }

            def terminate(){
                actor.terminate()
            }

            def hasWork(incrementWork) {
                workCount += (incrementWork ? 1 : -1)
                workCount > 0
            }
        }
    }
}
person Taplar    schedule 14.05.2015