Эквивалент std::deque в контейнере tbb::concurrent_queue?

Я использую функцию std::deque at для доступа к элементам без извлечения из очереди, поскольку я использую одну и ту же очередь в разных итерациях. Мое решение основано на крупнозернистой многопоточности. Теперь я хотел сделать это мелкозернистым многопоточным решением. Для этого я использую tbb::concurrent_queue. Но мне нужна эквивалентная функция операции std::deque at в tbb::concurrent_queue?

EDIT Вот как я реализую с помощью std::deque (грубая многопоточность). Имейте в виду, что dq является статической очередью (т.е. используется много раз в разных итерациях)

vertext_found = true;
std::deque<T> dq;
while ( i < dq->size())
{
    EnterCriticalSection(&h);
    if( i < dq.size() ) 
    {
          v = dq.at(i);      // accessing element of queue without popping
          i++;
          vertext_found = true;
    }
    LeaveCriticalSection(&h);
    if (vertext_found && (i < dq.size()) && v != NULL)
    {
          **operation on 'v'
          vertext_found = false;
    }
}

Я хочу добиться того же результата с помощью tbb::concurrent_queue?


person Sumanth    schedule 05.03.2013    source источник


Ответы (2)


Если ваш алгоритм имеет отдельные проходы, которые заполняют очередь или потребляют ее, рассмотрите возможность использования tbb::concurrent_vector. У него есть метод push_back, который можно использовать для прохода заполнения, и метод at() для проходов потребления. Если потоки соревнуются за извлечение элементов в проходе потребления, рассмотрите возможность использования счетчика tbb::atomic для создания индексов для at().

Если бы не было такого четкого разделения наполнения и потребления, использование at(), вероятно, создало бы больше проблем, чем решило бы, даже если бы оно существовало, потому что это было бы гонкой против потребителя.

Если проходу потребления просто нужно параллельно пройти через concurrent_vector, рассмотрите возможность использования tbb::parallel_for для цикла. В tbb::concurrent_vector есть метод range(), поддерживающий эту идиому.

void consume( tbb::concurrent_vector<T>& vec ) {
    tbb::parallel_for( vec.range(), [&]( const tbb::concurrent_vector<T>::range_type& r ) {
        for( auto i=r.begin(); i!=r.end(); ++i ) {
            T value = *i;
            ...process value...;
        }
    });
}

Если проход потребления не может использовать tbb:parallel_for, рассмотрите возможность использования атомарного счетчика TBB для создания индексов. Инициализируйте счетчик нулем и используйте ++, чтобы увеличить его. Вот пример:

tbb::atomic<size_t> head;
tbb::concurrent_vector<T> vec;

bool pop_one( T& result ) { // Try to grab next item from vec
    size_t i = head++;      // Fetch-and-increment must be single atomic operation
    if( i<vec.size() ) {
        result = vec[i];
        return true;
    } else {
        return false;       // Failed
    }
}

В целом это решение будет менее масштабируемым, чем использование tbb::parallel_for, потому что счетчик "голова" создает конфликтную ситуацию в системе памяти.

person Arch D. Robison    schedule 05.03.2013
comment
У него нет отдельных фаз. Заполнение очереди - это однократная операция. Затем эта очередь будет потребляться 8 потоками. Но к одной и той же очереди обращаются много раз (итерации). Итак, если я выталкиваю элементы, то я должен снова заполнить очередь, которая будет накладной. Для этого я обращаюсь к элементам с помощью операции at, которая недоступна в tbb::concurrent_queue. Я отредактировал свой пост и включил код для std::deque. Пожалуйста, проверьте и предложите мне. - person Sumanth; 06.03.2013

Согласно документам Doxygen на сайте TBB (документы TBB Doxy), в очереди нет операции at. Вы можете push и try_pop элементы с tbb::strict_ppl::concurrent_queue.

Если вы используете tbb::deprecated::concurrent_queue (более старые версии TBB), доступны операции push_if_not_full и pop_if_present.

В обеих очередях «несколько потоков могут отправлять и извлекать одновременно», как указано в кратком разделе.

person Adri C.S.    schedule 05.03.2013
comment
Есть еще tbb::concurrent_bounded_queue, но у него нет и метода at. Единственный контейнер TBB, который, кажется, имеет at, это tbb::concurrent_vector. - person Adri C.S.; 05.03.2013
comment
Да, я знаю, что push и try_pop доступны. Но это изменит очередь и, следовательно, производительность снизится. Я не могу получить доступ к элементам concurrent_queue без операции pop? - person Sumanth; 05.03.2013