Ожидание нескольких задач потока данных async gpars с тайм-аутом

Я изо всех сил пытаюсь достичь цели иметь несколько асинхронных задач с общим тайм-аутом. Хитрость в том, что мне нужно обработать все, что было получено в течение тайм-аута.

Например, приведенный ниже код получает значение обеих задач, когда значение времени ожидания превышает две секунды. Но как только время ожидания уменьшается (или, альтернативно, задачи выполняются дольше), генерируется только исключение TimeoutException, и результаты задачи не принимаются.

def timeout = 3  // value in seconds

def t1 = task {
    Thread.sleep(1000)
    println 't1 done'
    't1'
}

def t2 = task {
    Thread.sleep(2000)
    println 't2 done'
    't2'
}

def results = whenAllBound( [t1, t2] ) { List l ->
    println 'all done ' + l
    l.join(', ')
}.get( timeout, SECONDS )

println "results $results"

Вместо get() использование join() не вызывает TimeoutException, но опять же не возвращает окончательных результатов и продолжает обработку кода после истечения таймаута.

Либо я не полностью/достаточно/правильно понимаю структуры потока данных, либо пытаюсь использовать их неправильно, либо и то, и другое.

По сути, мне нужен блок синхронизации, который запускает несколько асинхронных заданий с общим тайм-аутом, возвращая любые ответы, которые были доступны, когда тайм-аут произошел. Время ожидания является скорее исключительным случаем, но время от времени происходит для каждой из задач и не должно влиять на общую обработку.


person kaskelotti    schedule 09.11.2015    source источник


Ответы (1)


Возможно, этот способ мог бы работать для вас:

whenAllBound( [t1, t2] ) { List l ->
    println 'all done ' + l
    l.join(', ')
}.join( timeout, java.util.concurrent.TimeUnit.SECONDS )

def results = [t1, t2].collect {it.bound ? it.get() : null}
println "results $results"
person Vaclav Pech    schedule 12.11.2015
comment
Да, похоже, это именно то, что мне нужно. У меня почти готово рабочее решение с помощью Select, DataflowQueue и AtomicInteger, но это намного чище. - person kaskelotti; 12.11.2015