Действительно ли Boost_lockFree_SPSC_queue имеет переменный размер и не теряет данных?

Привет, я планировал использовать очередь Boost SPSC. У меня должен быть прослушиватель, который всегда прослушивает входящие сообщения, как только он получит, он поместит сообщение в очередь и вернется, чтобы снова дождаться новых сообщений. И есть потребитель, который выталкивает сообщения из очереди и обрабатывает их.

Поскольку я не хочу, чтобы мой слушатель ждал, пока блокировка запишется в очередь, я предпочел boost_lockfree_spsc_queue. И я полагал, что размер очереди изменится, если я установлю fixed_sized<false> при инициализации очереди. Но кажется, что это не так.

Ниже приведен мой код:

boost::lockfree::spsc_queue<int, boost::lockfree::fixed_sized<false>> lockFreeQ{10};

void sharedQueue::lockFreeProduce()
{

    for(int i = 0; i < 100; i++)
    {
       lockFreeQ.push(i);        
       cout<<"--"<<i<<endl;
    }

}
void sharedQueue::lockFreeConsume(){

for(int i = 0;  i <100; i++){

   /* Implement a blocking pop to the queue in order to waste cpu time  by busy waiting*/

        while(!(lockFreeQ.read_available() > 0))
           {
               //std::this_thread::yield();
                 std::this_thread::sleep_for(std::chrono::nanoseconds(10));
           }

       cout<<"   ***"<<lockFreeQ.front()<<endl;
       lockFreeQ.pop();

   }

}

void sharedQueue:: TestLockFreeQueue()
{

   std::thread t1([this]() { this->lockFreeProduce(); });
   std::thread t2([this]() { this->lockFreeConsume(); });   
   t1.join();
   t2.join();
}

Вывод показывает, что потеря данных. Ниже приводится частичный результат. но, например, он пропускает номер 87 ,92.

**85
***86
***88
***90
***91
***93
***95
***96
***98

Если бы он был переменного размера, он должен был бы увеличить размер очереди и не потерять данные. Кажется, что он перезаписывает данные, как объяснено Следствием циклического буфера является то, что когда он заполнен и последующий запись выполняется, затем она начинает перезаписывать самые старые данные.. В таком случае, как с этим справиться без потери данных?

Спасибо


person Kid    schedule 07.12.2016    source источник
comment
Ваша очередь по-прежнему представляет собой циклический буфер с max_size, равным 10. Единственное, что изменилось в политике, это то, что вместо массива стека будет использоваться динамически выделяемый массив.   -  person Arunmu    schedule 07.12.2016
comment
Что вы подразумеваете под «динамически выделяемым массивом»? Означает ли это, что размер массива будет увеличиваться, если очередь заполнена?   -  person Kid    schedule 08.12.2016
comment
Я имел в виду, что массив будет выделен с помощью распределителя. Размер динамически выделяемого массива будет равен размеру, который вы передали в конструкторе. Размер массива не будет увеличиваться при заполнении очереди, вместо этого он перезапишет данные в первом индексе, потому что используемая структура данных всегда будет циклическим буфером размера, который вы передали в конструкторе.   -  person Arunmu    schedule 08.12.2016
comment
Итак, в заключение, вы должны выбрать достаточно высокое значение при построении очереди, которое в лучшем случае может гарантировать, что данные не будут перезаписаны. По сути, вы не можете гарантировать такое поведение, используя циклическую очередь, потому что вы никогда не знаете, когда потребитель выйдет из строя или начнет медленно обрабатывать.   -  person Arunmu    schedule 08.12.2016
comment
Итак, размер массива в любом случае не будет больше 10. Поэтому, когда массив заполнен и есть данные, которые необходимо заполнить, просто замените старые данные. следовательно, потеря данных неизбежна. Есть ли способ избежать потери данных?   -  person Kid    schedule 08.12.2016
comment
Смотрите мой комментарий выше. ИЛИ используйте список бесплатных блокировок (1024cores.com). Если вы в конечном итоге используете список, убедитесь, что вы привязали его к какому-то высокому значению. Вы не хотите, чтобы ваш производитель бесконечно заполнял список, пока потребитель не работает или ничего не делает.   -  person Arunmu    schedule 08.12.2016
comment
Я перенесу эти комментарии, чтобы ответить позже, так как я не вижу подобных вопросов, заданных ранее.   -  person Arunmu    schedule 08.12.2016
comment
Да, я видел предыдущий комментарий. Указанный вами сайт не работает.   -  person Kid    schedule 08.12.2016
comment
1024cores.net   -  person Arunmu    schedule 08.12.2016
comment
Проголосовал за ссылку. Спасибо Арун.   -  person Kid    schedule 08.12.2016


Ответы (1)


Перевод комментариев к ответу.

Политика работает для spsc_queue следующим образом:

typedef typename mpl::if_c<runtime_sized,                                  
                           runtime_sized_ringbuffer<T, allocator>,
                           compile_time_sized_ringbuffer<T, capacity>
                          >::type ringbuffer_type;

В вашем случае он выбирает буфер runtime_sized_ringbuffer вместо compile_time_sized_ringbuffer.

Разница между ними заключается в том, что в случае runtime_sized_ringbuffer массив размера (переданный в качестве аргумента конструктору) выделяется с использованием allocator, переданного в качестве параметра шаблона, и в случае compile_time_sized_ringbuffer выделяется массив в стеке.

Таким образом, если размер вашей очереди велик, вы должны использовать runtime_sized_ringbuffer вместо compile_time_sized_ringbuffer.

Это единственное различие: в обоих случаях базовая структура данных представляет собой circular_buffer. то есть даже в случае runtime_sized_ringbuffer самый старый элемент будет перезаписан после того, как вы закончите запись во все местоположения/индекс в буфере. Это довольно прямолинейно из кода ниже

static size_t next_index(size_t arg, size_t max_size)
{
  size_t ret = arg + 1;
  while (unlikely(ret >= max_size))
    ret -= max_size;
  return ret;
}

Если вам действительно нужна структура данных списка блокировки, вам следует взглянуть на известную структуру данных списка блокировки блокировки Дмитрия Вьюкова

person Arunmu    schedule 08.12.2016