Как барьеры могут разрушаться, как только возвращается pthread_barrier_wait?

Этот вопрос основан на:

Когда безопасно разрушать барьер pthread?

и недавний отчет об ошибке glibc:

http://sourceware.org/bugzilla/show_bug.cgi?id=12674

Я не уверен в проблеме с семафорами, о которой сообщается в glibc, но, по-видимому, предполагается, что она может уничтожить барьер, как только pthread_barrier_wait вернется, согласно приведенному выше связанному вопросу. (Обычно поток, получивший PTHREAD_BARRIER_SERIAL_THREAD, или «специальный» поток, который уже считает себя «ответственным» за объект-барьер, будет тем, кто его уничтожит.) Основной вариант использования, о котором я могу думать, — это когда используется барьер синхронизировать использование данных новым потоком в стеке создающего потока, предотвращая возврат создающего потока до тех пор, пока новый поток не начнет использовать данные; другие барьеры, вероятно, имеют время жизни, равное сроку жизни всей программы, или управляются каким-либо другим объектом синхронизации.

В любом случае, как реализация может гарантировать, что разрушение барьера (и, возможно, даже удаление памяти, в которой он находится) будет безопасным, как только pthread_barrier_wait вернется в любом потоке? Похоже, что другие потоки, которые еще не вернулись, должны будут проверить хотя бы часть объекта барьера, чтобы завершить свою работу и вернуться, во многом подобно тому, как в приведенном выше отчете об ошибке glibc sem_post должен проверить количество официантов после того, как скорректировано значение семафора.


person R.. GitHub STOP HELPING ICE    schedule 04.05.2011    source источник


Ответы (2)


Я собираюсь еще раз попробовать это на примере реализации pthread_barrier_wait(), которая использует функции мьютекса и условной переменной, которые могут быть обеспечены реализацией pthreads. Обратите внимание, что в этом примере не рассматриваются соображения производительности (в частности, когда ожидающие потоки разблокированы, все они повторно сериализуются при выходе из ожидания). Я думаю, что использование чего-то вроде объектов Linux Futex могло бы решить проблемы с производительностью, но Futexs все еще почти не используется в моем опыте.

Кроме того, я сомневаюсь, что этот пример правильно обрабатывает сигналы или ошибки (если вообще обрабатывает сигналы). Но я думаю, что надлежащая поддержка этих вещей может быть добавлена ​​в качестве упражнения для читателя.

Я больше всего опасаюсь, что в примере может быть состояние гонки или взаимоблокировка (обработка мьютекса сложнее, чем мне нравится). Также обратите внимание, что это пример, который даже не был скомпилирован. Относитесь к этому как к псевдокоду. Также имейте в виду, что мой опыт в основном связан с Windows — я рассматриваю это больше как образовательную возможность, чем что-либо еще. Так что качество псевдокода вполне может быть довольно низким.

Однако, помимо заявлений об отказе от ответственности, я думаю, что это может дать представление о том, как проблема, заданная в вопросе, может быть решена (т. Е. Как функция pthread_barrier_wait() может разрешить объект pthread_barrier_t, который она использует, быть уничтоженным любым из выпущенных потоков без опасности использование объекта-барьера одним или несколькими потоками на выходе).

Вот оно:

/* 
 *  Since this is a part of the implementation of the pthread API, it uses
 *  reserved names that start with "__" for internal structures and functions
 *
 *  Functions such as __mutex_lock() and __cond_wait() perform the same function
 *  as the corresponding pthread API.
 */

// struct __barrier_wait data is intended to hold all the data
//  that `pthread_barrier_wait()` will need after releasing
//  waiting threads.  This will allow the function to avoid
//  touching the passed in pthread_barrier_t object after 
//  the wait is satisfied (since any of the released threads
//   can destroy it)

struct __barrier_waitdata {
    struct __mutex cond_mutex;
    struct __cond cond;

    unsigned waiter_count;
    int wait_complete;
};

struct __barrier {
    unsigned count;

    struct __mutex waitdata_mutex;
    struct __barrier_waitdata* pwaitdata;
};

typedef struct __barrier pthread_barrier_t;



int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
    waitdata.waiter_count = 0;
    waitdata.wait_complete = 0;

    rc = __mutex_init( &waitdata.cond_mutex, NULL);
    if (!rc) {
        return rc;
    }

    rc = __cond_init( &waitdata.cond, NULL);
    if (!rc) {
        __mutex_destroy( &pwaitdata->waitdata_mutex);
        return rc;
    }

    return 0;
}




int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
    int rc;

    result = __mutex_init( &barrier->waitdata_mutex, NULL);
    if (!rc) return result;

    barrier->pwaitdata = NULL;
    barrier->count = count;

    //TODO: deal with attr
}



int pthread_barrier_wait(pthread_barrier_t *barrier)
{
    int rc;
    struct __barrier_waitdata* pwaitdata;
    unsigned target_count;

    // potential waitdata block (only one thread's will actually be used)
    struct __barrier_waitdata waitdata; 

    // nothing to do if we only need to wait for one thread...
    if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;

    rc = __mutex_lock( &barrier->waitdata_mutex);
    if (!rc) return rc;

    if (!barrier->pwaitdata) {
        // no other thread has claimed the waitdata block yet - 
        //  we'll use this thread's

        rc = __barrier_waitdata_init( &waitdata);
        if (!rc) {
            __mutex_unlock( &barrier->waitdata_mutex);
            return rc;
        }

        barrier->pwaitdata = &waitdata;
    }

    pwaitdata = barrier->pwaitdata;
    target_count = barrier->count;

    //  all data necessary for handling the return from a wait is pointed to
    //  by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
    //  one of the waiting threads.  We have to make sure that the thread that owns
    //  that block waits until all others have finished with the information
    //  pointed to by `pwaitdata` before it returns.  However, after the 'big' wait
    //  is completed, the `pthread_barrier_t` object that's passed into this 
    //  function isn't used. The last operation done to `*barrier` is to set 
    //  `barrier->pwaitdata = NULL` to satisfy the requirement that this function
    //  leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
    //  that operation is done by the thread that signals the wait condition 
    //  completion before the completion is signaled.

    // note: we're still holding  `barrier->waitdata_mutex`;

    rc = __mutex_lock( &pwaitdata->cond_mutex);
    pwaitdata->waiter_count += 1;

    if (pwaitdata->waiter_count < target_count) {
        // need to wait for other threads

        __mutex_unlock( &barrier->waitdata_mutex);
        do {
            // TODO:  handle the return code from `__cond_wait()` to break out of this
            //          if a signal makes that necessary
            __cond_wait( &pwaitdata->cond,  &pwaitdata->cond_mutex);
        } while (!pwaitdata->wait_complete);
    }
    else {
        // this thread satisfies the wait - unblock all the other waiters
        pwaitdata->wait_complete = 1;

        // 'release' our use of the passed in pthread_barrier_t object
        barrier->pwaitdata = NULL;

        // unlock the barrier's waitdata_mutex - the barrier is  
        //  ready for use by another set of threads
        __mutex_unlock( barrier->waitdata_mutex);

        // finally, unblock the waiting threads
        __cond_broadcast( &pwaitdata->cond);
    }

    // at this point, barrier->waitdata_mutex is unlocked, the 
    //  barrier->pwaitdata pointer has been cleared, and no further 
    //  use of `*barrier` is permitted...

    // however, each thread still has a valid `pwaitdata` pointer - the 
    // thread that owns that block needs to wait until all others have 
    // dropped the pwaitdata->waiter_count

    // also, at this point the `pwaitdata->cond_mutex` is locked, so
    //  we're in a critical section

    rc = 0;
    pwaitdata->waiter_count--;

    if (pwaitdata == &waitdata) {
        // this thread owns the waitdata block - it needs to hang around until 
        //  all other threads are done

        // as a convenience, this thread will be the one that returns 
        //  PTHREAD_BARRIER_SERIAL_THREAD
        rc = PTHREAD_BARRIER_SERIAL_THREAD;

        while (pwaitdata->waiter_count!= 0) {
            __cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
        };

        __mutex_unlock( &pwaitdata->cond_mutex);
        __cond_destroy( &pwaitdata->cond);
        __mutex_destroy( &pwaitdata_cond_mutex);
    }
    else if (pwaitdata->waiter_count == 0) {
        __cond_signal( &pwaitdata->cond);
        __mutex_unlock( &pwaitdata->cond_mutex);
    }

    return rc;
}

17 июля 2011 г.: обновление в ответ на комментарий/вопрос о барьерах, связанных с общими процессами.

Я совсем забыл о ситуации с барьерами, которые разделяют процессы. И, как вы упомянули, в этом случае идея, которую я изложил, ужасно провалится. На самом деле у меня нет опыта использования разделяемой памяти POSIX, поэтому любые мои предложения должны быть умерены скептицизмом.

Подводя итог (для моей пользы, если ни для кого другого):

Когда какой-либо из потоков получает управление после возврата pthread_barrier_wait(), объект барьера должен находиться в состоянии 'init' (однако его установил самый последний pthread_barrier_init() на этом объекте). API также подразумевает, что после возврата любого из потоков может произойти одно или несколько из следующих событий:

  • еще один вызов pthread_barrier_wait() для запуска нового раунда синхронизации потоков
  • pthread_barrier_destroy() на шлагбауме
  • память, выделенная для объекта барьера, может быть освобождена или разделена, если она находится в области разделяемой памяти.

Это означает, что прежде чем вызов pthread_barrier_wait() позволит вернуться любому потоку, ему в значительной степени необходимо убедиться, что все ожидающие потоки больше не используют объект барьера в контексте этого вызова. Мой первый ответ касался этого путем создания «локального» набора объектов синхронизации (мьютекс и связанная переменная условия) за пределами объекта-барьера, который блокировал бы все потоки. Эти локальные объекты синхронизации были размещены в стеке потока, который первым вызвал pthread_barrier_wait().

Я думаю, что что-то подобное нужно было бы сделать для барьеров, которые являются общими для процессов. Однако в этом случае простого размещения этих объектов синхронизации в стеке потока недостаточно (поскольку другие процессы не будут иметь к ним доступа). Для барьера, разделяемого процессом, эти объекты должны быть размещены в общей памяти процесса. Я думаю, что техника, которую я перечислил выше, может быть применена аналогичным образом:

  • waitdata_mutex, который управляет «распределением» локальных переменных синхронизации (блок данных ожидания), уже будет находиться в общей памяти процесса в силу того, что он находится в структуре барьера. Конечно, если для барьера установлено значение THEAD_PROCESS_SHARED, этот атрибут также необходимо применить к waitdata_mutex.
  • когда __barrier_waitdata_init() вызывается для инициализации локального мьютекса и переменной условия, ему придется размещать эти объекты в разделяемой памяти, а не просто использовать переменную waitdata на основе стека.
  • когда поток «очистки» уничтожает мьютекс и условную переменную в блоке waitdata, ему также необходимо очистить выделение общей памяти процесса для блока.
  • в случае, когда используется разделяемая память, должен быть какой-то механизм, гарантирующий, что объект разделяемой памяти открывается по крайней мере один раз в каждом процессе и закрывается правильное количество раз в каждом процессе (но не закрывается полностью перед каждым потоком в процесс завершен с его использованием). Я не думал, как именно это будет сделано...

Я думаю, что эти изменения позволят схеме работать с барьерами, разделяемыми процессами. последний пункт списка выше является ключевым моментом, который нужно выяснить. Другой способ — создать имя для объекта общей памяти, который будет содержать «локальный» общий процесс waitdata. Есть определенные атрибуты, которые вам нужны для этого имени:

  • вы хотите, чтобы хранилище для имени находилось в структуре struct pthread_barrier_t, чтобы все процессы имели к нему доступ; что означает известный предел длины имени
  • вы хотели бы, чтобы имя было уникальным для каждого «экземпляра» набора вызовов pthread_barrier_wait(), потому что может быть возможно, чтобы второй раунд ожидания начался до того, как все потоки полностью вышли из первого раунда ожидания (поэтому блок памяти общего процесса, установленный для waitdata, возможно, еще не был освобожден). Таким образом, имя, вероятно, должно быть основано на таких вещах, как идентификатор процесса, идентификатор потока, адрес объекта-барьера и атомарный счетчик.
  • Я не знаю, есть ли последствия для безопасности, если имя будет «угадываемым». если это так, необходимо добавить некоторую рандомизацию - не знаю, насколько. Возможно, вам также потребуется хешировать данные, упомянутые выше, вместе со случайными битами. Как я уже сказал, я действительно понятия не имею, важно это или нет.
person Michael Burr    schedule 05.05.2011
comment
Мне нужно время, чтобы прочитать это подробно, но мне очень нравится идея хранить состояние в одном из стеков официантов, а не в самом объекте барьера. Можно даже использовать этот подход таким образом, чтобы новый набор потоков мог начать ожидать на барьере еще до того, как все первый набор завершит возврат. - person R.. GitHub STOP HELPING ICE; 05.05.2011
comment
@R.: Я считаю, что пример поддерживает то, что вы упомянули (новые потоки могут начать ждать до того, как первый набор полностью выйдет из своего ожидания), потому что barrier->pwaitdata устанавливается в NULL, удерживая barrier->waitdata_mutex. По сути, поток, который собирается освободить ожидающих, подготавливает объект барьера для повторного использования, прежде чем освобождать этот мьютекс. - person Michael Burr; 06.05.2011
comment
Действительно, основываясь на вашей идее, я разработал реализацию на основе фьютексов, которая избегает доступа к исходному объекту барьера после того, как любой поток разблокирован, и даже вообще избегает системных вызовов, если он может обойтись несколькими вращениями (это часто может покрыть важный счет = 2 case для синхронизации новых аргументов запуска потока). - person R.. GitHub STOP HELPING ICE; 06.05.2011
comment
Примечание. Потребовалось немного нетрадиционное использование фьютексов, чтобы разблокировать исходный поток со структурой барьера-экземпляра; В итоге я использовал тот же int, что и флаг ожидания, и все остальные потоки получили доступ к флагу экземпляра. - person R.. GitHub STOP HELPING ICE; 06.05.2011
comment
Я столкнулся с проблемой - этот подход не учитывает барьеры, разделяемые процессами, и на самом деле, если вы наивно примените его к барьерам, разделяемым процессами, это приведет к повреждению памяти. Есть ли у вас умные идеи для решения этой проблемы? - person R.. GitHub STOP HELPING ICE; 17.07.2011
comment
@R.: Я добавил некоторые мысли, но я не думаю, что они поднимаются до уровня ума. На самом деле, они, скорее всего, имеют серьезные недостатки. У меня нет опыта работы с объектами разделяемой памяти пользовательского режима POSIX/Linux. Мне придется проработать кое-какой материал по этому вопросу. Но я надеюсь, что в посте может быть что-то, что больше поможет, чем помешает. - person Michael Burr; 18.07.2011
comment
Проблема, которую я все еще вижу, это разрешения. pthread_barrier_wait не может определить разрешения, необходимые для доступа к разделяемой памяти, в которой находится объект барьера (на самом деле это может быть анонимный mmap, унаследованный от fork и setuid), поэтому он не может создать новый объект разделяемой памяти, ограниченный те же потенциальные пользователи... - person R.. GitHub STOP HELPING ICE; 18.07.2011
comment
Вот единственное решение, которое я могу придумать: первый вызывающий создает именованный канал с разрешениями 622 и сохраняет имя в структуре барьера. Последующие вызывающие объекты увеличивают значение счетчика в барьерной структуре и открывают канал для записи, блокируя таким образом до тех пор, пока не появится читатель. Когда счетчик достигает предела, первый вызывающий объект просыпается (из пробуждения фьютекса на счетчике), сбрасывает барьер в состояние инициализации и открывает, а затем закрывает канал для чтения, разблокируя все остальные ожидающие. - person R.. GitHub STOP HELPING ICE; 18.07.2011
comment
Я думаю, что у меня есть решение для случая pshared; см. мой дополнительный вопрос и ответ на него: stackoverflow.com/questions/6935769/ - person R.. GitHub STOP HELPING ICE; 27.09.2011

Насколько я понимаю, нет необходимости в том, чтобы pthread_barrier_destroy выполнялась немедленная операция. Вы можете подождать, пока все потоки, которые все еще находятся в фазе пробуждения, не будут разбужены.

Например, у вас может быть атомарный счетчик awakening, который изначально установлен на количество пробужденных потоков. Затем оно будет уменьшено как последнее действие перед возвратом pthread_barrier_wait. pthread_barrier_destroy тогда просто может вращаться, пока этот счетчик не упадет до 0.

person Jens Gustedt    schedule 04.05.2011
comment
Требуется ли вызывать pthread_barrier_destroy до того, как барьер выйдет из области действия или объект будет освобожден? Я немного не понимаю этого. Если это так, я считаю, что этот ответ может быть правильным. - person R.. GitHub STOP HELPING ICE; 04.05.2011
comment
К сожалению, вращение будет здесь довольно плохим поведением (например, оно может даже привести к взаимоблокировке, когда один из пробуждающихся потоков имеет более низкий приоритет, чем вращающийся поток). Я думаю, что использование подсчета официантов и фьютекса (или эквивалента) также сработает. - person R.. GitHub STOP HELPING ICE; 04.05.2011
comment
Р.., правильно, я не подумал о разных приоритетах. Так что, вероятно, время от времени понадобится yield. И, конечно же, в приличной системе, где у вас есть что-то подобное :), использование переменной атомарного счетчика в качестве фьютекса, безусловно, было бы довольно эффективной реализацией. - person Jens Gustedt; 05.05.2011
comment
Использование фьютекса и счетчика ожидания — это то, что делает glibc. Реализация, по-видимому, синхронизируется при пробуждении потоков после преодоления барьера и гарантирует, что последним возвращаемым потоком будет тот, который возвращает PTHREAD_BARRIER_SERIAL_THREAD, поэтому к тому времени (он защищен блокировкой) все остальные потоки продвинутся вперед и не t осмотрите шлагбаум. - person nos; 05.05.2011
comment
@nos: я не понимаю, как это может работать, если вы не объедините это с ответом Майкла об использовании вторичной структуры в стеке потока, который намеревается вернуться последним. Если какой-либо поток уже вернулся из pthread_barrier_wait, я не вижу безопасного способа проверить количество официантов. Вы можете сделать безусловный вызов пробуждения фьютекса, но это кажется очень расточительным в барьерах с числом = 2 и очень короткими интервалами времени, которые они охватывают, где в противном случае они, вероятно, могли бы обойтись несколькими вращениями. - person R.. GitHub STOP HELPING ICE; 06.05.2011