Как надежно связать поток с экземпляром объекта в concurrency::parallel_for?

У меня есть вектор из M изображений, которые должны обрабатываться параллельно до N потоков, где N — параметр, заданный пользователем.

У меня также есть вектор из N экземпляров Detector, которые занимаются обработкой, но каждый экземпляр должен выполняться в своем собственном потоке (т. е. если два потока вызывают detect() в одном и том же экземпляре до завершения предыдущего вызова, произойдет что-то плохое).

Detector — это автономный класс (который я могу изменить при необходимости) с одним вызываемым мной методом void Detector::detect(cv::Mat image), который изменяет внутреннее состояние детектора на время (длительного) процесса обнаружения (отсюда необходимость предотвращения параллельных вызовов detect() из разные ветки).

Первоначально я реализовал это с OpenMP как:

#pragma omp parallel for num_threads(N)
for(int i=0; i<M; i++)
{
    detectors[omp_get_thread_num()].detect(images[i]);
}

Однако, поскольку обнаружение может генерировать исключения, я подумал о том, чтобы вместо этого использовать PPL parallel_for, который поставляется с перехватом исключений, созданных потоком, в основном потоке.

Проблема в том, что я не могу найти эквивалент omp_get_thread_num, который можно использовать для сопоставления Detector с конкретным потоком:

concurrency::CurrentScheduler::Create( concurrency::SchedulerPolicy( 2, 
concurrency::MinConcurrency, 1, concurrency::MaxConcurrency, N ) );
concurrency::parallel_for(0, M, [&](int i)
{
    detectors[?????].detect(images[i]);
});
concurrency::CurrentScheduler::Detach(); // clear scheduler

Как я могу гарантировать, что один поток всегда использует один и тот же экземпляр из пула детекторов? Или, если это неправильный подход, как я могу сопоставить выполнение detect() с уже имеющимся у меня пулом детекторов?


person GPhilo    schedule 05.12.2018    source источник
comment
Из любопытства, если детектор — это простой класс, почему бы не создать его для каждого изображения? Это может быть не оптимально, но если это просто, а процесс обнаружения занимает гораздо больше времени, то влияние будет минимальным. Если, конечно, нет причин, по которым вы не можете этого сделать.   -  person Qubit    schedule 05.12.2018
comment
Вы можете использовать какую-то потокобезопасную очередь (PPL имеет параллельную очередь), а затем в вашей лямбда-функции pop первый детектор, вызовите detect, а затем поместите его в очередь, когда закончите. Это позволяет потоку использовать любой доступный детектор.   -  person NathanOliver    schedule 05.12.2018
comment
@Qubit Я определенно неправильно использовал слово «простой». Нет, детектор довольно массивный, просто у него простой интерфейс. Основная проблема здесь — память GPU. У меня есть N детекторов, каждому из которых выделено около N/4 памяти GPU. Количество изображений может стать слишком большим, чтобы я мог сохранить такое же количество детекторов и изображений, имея при этом достаточно памяти графического процессора для запуска обнаружения.   -  person GPhilo    schedule 05.12.2018
comment
@NathanOliver, это хороший подход, я попробую   -  person GPhilo    schedule 05.12.2018
comment
@GPhilo Круто, дай мне знать, как дела. Вы, вероятно, захотите сохранить интеллектуальный указатель на детектор в очереди, чтобы не копировать детектор.   -  person NathanOliver    schedule 05.12.2018
comment
Да, я уже работаю с умными указателями в векторе, так что я бы все равно пошел по этому пути.   -  person GPhilo    schedule 05.12.2018


Ответы (1)


По предложению @NathanOliver я решил использовать concurrent_queue для решения проблемы:

using namespace concurrency;
CurrentScheduler::Create( SchedulerPolicy( 2, 
MinConcurrency, 1, MaxConcurrency, N ) );
concurrent_queue<std::shared_ptr<Detector>> detectors_queue;
for(auto& det : obj->instances)
{
    detectors_queue.push(det);
}
parallel_for(0, M, [&](int i)
{
    std::shared_ptr<Detector> thread_det = nullptr;
    while(!detectors_queue.try_pop(thread_det))
    {
        wait(100);
    }
    thread_det->detect(images[i]);
    detectors_queue.push(thread_det);
});
CurrentScheduler::Detach(); // clear scheduler
person GPhilo    schedule 05.12.2018