асинхронный сбор результатов от параллельного исполнителя gpars

У нас есть код на Java с использованием ThreadPoolExecutor и CompletionService. Задачи отправляются в пул большими партиями; результаты отправляются в службу завершения, где мы собираем завершенные задачи, когда они доступны, не дожидаясь завершения всего пакета:

 ThreadPoolExecutor _executorService =
            new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
 CompletionService _completionService =
            new ExecutorCompletionService<Callable>(_executorService)

//submit tasks
_completionService.submit( some task);

//get results
while(...){
   Future result = _completionService.poll(timeout);
   if(result)
      //process result
}

Общее количество рабочих в пуле: MAX_NUMBER_OF_WORKERS; задачи, отправленные без доступного работника, ставятся в очередь; до 20 задач могут быть поставлены в очередь, после чего задачи отклоняются.

Каков аналог этого подхода в Gpars?

Прочитав документацию по параллелизму gpars, я нашел множество возможных вариантов: collectManyParallel(), anyParallel(), fork/join и т. д., и я не уверен, какие из них даже проверить. Я надеялся найти упоминание о «завершении» или «службе завершения» в качестве сравнения в документах, но ничего не нашел. Я ищу некоторые направления/указатели о том, с чего начать, от тех, кто имеет опыт работы с gpars.


person raffian    schedule 13.01.2014    source источник


Ответы (1)


Оперативный сбор результатов, регулирование производителей — для этого требуется решение для потока данных. Пожалуйста, найдите пример работающей демонстрации ниже:

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

int MAX_NUMBER_OF_WORKERS = 10

ThreadPoolExecutor _executorService =
        new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));

final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()

//submit tasks
30.times {value ->
    group.task(new Runnable() {
        @Override
        void run() {
            println 'Starting ' + Thread.currentThread()
            sleep 5000
            println 'Finished ' + Thread.currentThread()
            results.bind(value)
        }
    });
}
group.task {
    results << -1  //stop the consumer eventually
}

//get results
while (true) {
    def result = results.val
    println result
    if (result == -1) break
    //process result
}

group.shutdown()
person Vaclav Pech    schedule 14.01.2014
comment
выглядит очень интересно, я проверю это, ty - person raffian; 14.01.2014