Я собираюсь еще раз попробовать это на примере реализации pthread_barrier_wait()
, которая использует функции мьютекса и условной переменной, которые могут быть обеспечены реализацией pthreads. Обратите внимание, что в этом примере не рассматриваются соображения производительности (в частности, когда ожидающие потоки разблокированы, все они повторно сериализуются при выходе из ожидания). Я думаю, что использование чего-то вроде объектов Linux Futex могло бы решить проблемы с производительностью, но Futexs все еще почти не используется в моем опыте.
Кроме того, я сомневаюсь, что этот пример правильно обрабатывает сигналы или ошибки (если вообще обрабатывает сигналы). Но я думаю, что надлежащая поддержка этих вещей может быть добавлена в качестве упражнения для читателя.
Я больше всего опасаюсь, что в примере может быть состояние гонки или взаимоблокировка (обработка мьютекса сложнее, чем мне нравится). Также обратите внимание, что это пример, который даже не был скомпилирован. Относитесь к этому как к псевдокоду. Также имейте в виду, что мой опыт в основном связан с Windows — я рассматриваю это больше как образовательную возможность, чем что-либо еще. Так что качество псевдокода вполне может быть довольно низким.
Однако, помимо заявлений об отказе от ответственности, я думаю, что это может дать представление о том, как проблема, заданная в вопросе, может быть решена (т. Е. Как функция pthread_barrier_wait()
может разрешить объект pthread_barrier_t
, который она использует, быть уничтоженным любым из выпущенных потоков без опасности использование объекта-барьера одним или несколькими потоками на выходе).
Вот оно:
/*
* Since this is a part of the implementation of the pthread API, it uses
* reserved names that start with "__" for internal structures and functions
*
* Functions such as __mutex_lock() and __cond_wait() perform the same function
* as the corresponding pthread API.
*/
// struct __barrier_wait data is intended to hold all the data
// that `pthread_barrier_wait()` will need after releasing
// waiting threads. This will allow the function to avoid
// touching the passed in pthread_barrier_t object after
// the wait is satisfied (since any of the released threads
// can destroy it)
struct __barrier_waitdata {
struct __mutex cond_mutex;
struct __cond cond;
unsigned waiter_count;
int wait_complete;
};
struct __barrier {
unsigned count;
struct __mutex waitdata_mutex;
struct __barrier_waitdata* pwaitdata;
};
typedef struct __barrier pthread_barrier_t;
int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
waitdata.waiter_count = 0;
waitdata.wait_complete = 0;
rc = __mutex_init( &waitdata.cond_mutex, NULL);
if (!rc) {
return rc;
}
rc = __cond_init( &waitdata.cond, NULL);
if (!rc) {
__mutex_destroy( &pwaitdata->waitdata_mutex);
return rc;
}
return 0;
}
int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
int rc;
result = __mutex_init( &barrier->waitdata_mutex, NULL);
if (!rc) return result;
barrier->pwaitdata = NULL;
barrier->count = count;
//TODO: deal with attr
}
int pthread_barrier_wait(pthread_barrier_t *barrier)
{
int rc;
struct __barrier_waitdata* pwaitdata;
unsigned target_count;
// potential waitdata block (only one thread's will actually be used)
struct __barrier_waitdata waitdata;
// nothing to do if we only need to wait for one thread...
if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;
rc = __mutex_lock( &barrier->waitdata_mutex);
if (!rc) return rc;
if (!barrier->pwaitdata) {
// no other thread has claimed the waitdata block yet -
// we'll use this thread's
rc = __barrier_waitdata_init( &waitdata);
if (!rc) {
__mutex_unlock( &barrier->waitdata_mutex);
return rc;
}
barrier->pwaitdata = &waitdata;
}
pwaitdata = barrier->pwaitdata;
target_count = barrier->count;
// all data necessary for handling the return from a wait is pointed to
// by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
// one of the waiting threads. We have to make sure that the thread that owns
// that block waits until all others have finished with the information
// pointed to by `pwaitdata` before it returns. However, after the 'big' wait
// is completed, the `pthread_barrier_t` object that's passed into this
// function isn't used. The last operation done to `*barrier` is to set
// `barrier->pwaitdata = NULL` to satisfy the requirement that this function
// leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
// that operation is done by the thread that signals the wait condition
// completion before the completion is signaled.
// note: we're still holding `barrier->waitdata_mutex`;
rc = __mutex_lock( &pwaitdata->cond_mutex);
pwaitdata->waiter_count += 1;
if (pwaitdata->waiter_count < target_count) {
// need to wait for other threads
__mutex_unlock( &barrier->waitdata_mutex);
do {
// TODO: handle the return code from `__cond_wait()` to break out of this
// if a signal makes that necessary
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
} while (!pwaitdata->wait_complete);
}
else {
// this thread satisfies the wait - unblock all the other waiters
pwaitdata->wait_complete = 1;
// 'release' our use of the passed in pthread_barrier_t object
barrier->pwaitdata = NULL;
// unlock the barrier's waitdata_mutex - the barrier is
// ready for use by another set of threads
__mutex_unlock( barrier->waitdata_mutex);
// finally, unblock the waiting threads
__cond_broadcast( &pwaitdata->cond);
}
// at this point, barrier->waitdata_mutex is unlocked, the
// barrier->pwaitdata pointer has been cleared, and no further
// use of `*barrier` is permitted...
// however, each thread still has a valid `pwaitdata` pointer - the
// thread that owns that block needs to wait until all others have
// dropped the pwaitdata->waiter_count
// also, at this point the `pwaitdata->cond_mutex` is locked, so
// we're in a critical section
rc = 0;
pwaitdata->waiter_count--;
if (pwaitdata == &waitdata) {
// this thread owns the waitdata block - it needs to hang around until
// all other threads are done
// as a convenience, this thread will be the one that returns
// PTHREAD_BARRIER_SERIAL_THREAD
rc = PTHREAD_BARRIER_SERIAL_THREAD;
while (pwaitdata->waiter_count!= 0) {
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
};
__mutex_unlock( &pwaitdata->cond_mutex);
__cond_destroy( &pwaitdata->cond);
__mutex_destroy( &pwaitdata_cond_mutex);
}
else if (pwaitdata->waiter_count == 0) {
__cond_signal( &pwaitdata->cond);
__mutex_unlock( &pwaitdata->cond_mutex);
}
return rc;
}
17 июля 2011 г.: обновление в ответ на комментарий/вопрос о барьерах, связанных с общими процессами.
Я совсем забыл о ситуации с барьерами, которые разделяют процессы. И, как вы упомянули, в этом случае идея, которую я изложил, ужасно провалится. На самом деле у меня нет опыта использования разделяемой памяти POSIX, поэтому любые мои предложения должны быть умерены скептицизмом.
Подводя итог (для моей пользы, если ни для кого другого):
Когда какой-либо из потоков получает управление после возврата pthread_barrier_wait()
, объект барьера должен находиться в состоянии 'init' (однако его установил самый последний pthread_barrier_init()
на этом объекте). API также подразумевает, что после возврата любого из потоков может произойти одно или несколько из следующих событий:
- еще один вызов
pthread_barrier_wait()
для запуска нового раунда синхронизации потоков
pthread_barrier_destroy()
на шлагбауме
- память, выделенная для объекта барьера, может быть освобождена или разделена, если она находится в области разделяемой памяти.
Это означает, что прежде чем вызов pthread_barrier_wait()
позволит вернуться любому потоку, ему в значительной степени необходимо убедиться, что все ожидающие потоки больше не используют объект барьера в контексте этого вызова. Мой первый ответ касался этого путем создания «локального» набора объектов синхронизации (мьютекс и связанная переменная условия) за пределами объекта-барьера, который блокировал бы все потоки. Эти локальные объекты синхронизации были размещены в стеке потока, который первым вызвал pthread_barrier_wait()
.
Я думаю, что что-то подобное нужно было бы сделать для барьеров, которые являются общими для процессов. Однако в этом случае простого размещения этих объектов синхронизации в стеке потока недостаточно (поскольку другие процессы не будут иметь к ним доступа). Для барьера, разделяемого процессом, эти объекты должны быть размещены в общей памяти процесса. Я думаю, что техника, которую я перечислил выше, может быть применена аналогичным образом:
waitdata_mutex
, который управляет «распределением» локальных переменных синхронизации (блок данных ожидания), уже будет находиться в общей памяти процесса в силу того, что он находится в структуре барьера. Конечно, если для барьера установлено значение THEAD_PROCESS_SHARED
, этот атрибут также необходимо применить к waitdata_mutex
.
- когда
__barrier_waitdata_init()
вызывается для инициализации локального мьютекса и переменной условия, ему придется размещать эти объекты в разделяемой памяти, а не просто использовать переменную waitdata
на основе стека.
- когда поток «очистки» уничтожает мьютекс и условную переменную в блоке
waitdata
, ему также необходимо очистить выделение общей памяти процесса для блока.
- в случае, когда используется разделяемая память, должен быть какой-то механизм, гарантирующий, что объект разделяемой памяти открывается по крайней мере один раз в каждом процессе и закрывается правильное количество раз в каждом процессе (но не закрывается полностью перед каждым потоком в процесс завершен с его использованием). Я не думал, как именно это будет сделано...
Я думаю, что эти изменения позволят схеме работать с барьерами, разделяемыми процессами. последний пункт списка выше является ключевым моментом, который нужно выяснить. Другой способ — создать имя для объекта общей памяти, который будет содержать «локальный» общий процесс waitdata
. Есть определенные атрибуты, которые вам нужны для этого имени:
- вы хотите, чтобы хранилище для имени находилось в структуре
struct pthread_barrier_t
, чтобы все процессы имели к нему доступ; что означает известный предел длины имени
- вы хотели бы, чтобы имя было уникальным для каждого «экземпляра» набора вызовов
pthread_barrier_wait()
, потому что может быть возможно, чтобы второй раунд ожидания начался до того, как все потоки полностью вышли из первого раунда ожидания (поэтому блок памяти общего процесса, установленный для waitdata
, возможно, еще не был освобожден). Таким образом, имя, вероятно, должно быть основано на таких вещах, как идентификатор процесса, идентификатор потока, адрес объекта-барьера и атомарный счетчик.
- Я не знаю, есть ли последствия для безопасности, если имя будет «угадываемым». если это так, необходимо добавить некоторую рандомизацию - не знаю, насколько. Возможно, вам также потребуется хешировать данные, упомянутые выше, вместе со случайными битами. Как я уже сказал, я действительно понятия не имею, важно это или нет.
person
Michael Burr
schedule
05.05.2011