Я не совсем уверен, что это правильно, но я решил выбросить это для комментариев. Во-первых, позвольте мне представить алгоритм непересекающихся множеств без блокировки, который станет важной частью моего предлагаемого алгоритма.
Алгоритм непересекающихся множеств без блокировки
Я предполагаю наличие операции сравнения и замены размером с два указателя на выбранную вами архитектуру ЦП. Это доступно как минимум на архитектурах x86 и x64.
Алгоритм в основном такой же, как описан на странице Википедии для однопоточного случая, с некоторые модификации для безопасной безблокировочной работы. Во-первых, мы требуем, чтобы элементы rank и parent имели размер указателя и были выровнены по 2*sizeof(pointer) в памяти для последующего атомарного CAS.
Find() не нужно менять; в худшем случае оптимизация сжатия пути не даст полного эффекта при одновременной записи.
Однако Union() должен измениться:
function Union(x, y)
redo:
x = Find(x)
y = Find(y)
if x == y
return
xSnap = AtomicRead(x) -- read both rank and pointer atomically
ySnap = AtomicRead(y) -- this operation may be done using a CAS
if (xSnap.parent != x || ySnap.parent != y)
goto redo
-- Ensure x has lower rank (meaning y will be the new root)
if (xSnap.rank > ySnap.rank)
swap(xSnap, ySnap)
swap(x, y)
-- if same rank, use pointer value as a fallback sort
else if (xSnap.rank == ySnap.rank && x > y)
swap(xSnap, ySnap)
swap(x, y)
yNew = ySnap
yNew.rank = max(yNew.rank, xSnap.rank + 1)
xNew = xSnap
xNew.parent = y
if (!CAS(y, ySnap, yNew))
goto redo
if (!CAS(x, xSnap, xNew))
goto redo
return
Это должно быть безопасно в том смысле, что никогда не будет образовываться циклов и всегда будет приводить к правильному объединению. Мы можем подтвердить это, заметив, что:
- Во-первых, перед завершением один из двух корней всегда будет заканчиваться родителем, указывающим на другой. Следовательно, пока нет цикла, слияние выполняется успешно.
- Во-вторых, ранг всегда повышается. После сравнения порядка x и y мы знаем, что x имеет более низкий ранг, чем y во время снимка. Чтобы образовалась петля, другой поток должен сначала увеличить ранг x, а затем объединить x и y. Однако в CAS, который записывает родительский указатель x, мы проверяем, что ранг не изменился; следовательно, ранг y должен оставаться больше, чем x.
В случае одновременной мутации возможно ранг y может быть повышен, а затем вернуться к повторению из-за конфликта. Однако это означает, что либо y больше не является корнем (в этом случае ранг не имеет значения), либо ранг y был увеличен другим процессом (в этом случае второй обход не будет иметь никакого эффекта, и y будет иметь правильный ранг).
Следовательно, не должно быть никаких шансов образования петель, и этот неблокирующий алгоритм непересекающихся множеств должен быть безопасным.
А теперь к применению к вашей проблеме...
Предположения
Я делаю предположение, что сегменты хребта могут пересекаться только в своих конечных точках. Если это не так, вам нужно каким-то образом изменить фазу 1.
Я также делаю предположение, что сосуществование одного целочисленного пикселя достаточно для того, чтобы сегменты хребта могли быть соединены. Если нет, вам нужно будет изменить массив на этапе 1, чтобы он содержал несколько пар сегментов-кандидатов + непересекающийся набор, и отфильтровать, чтобы найти те, которые действительно связаны.
Структуры непересекающихся множеств, используемые в этом алгоритме, должны иметь в своих структурах ссылку на линейный сегмент. В случае слияния мы произвольно выбираем один из двух записанных сегментов для представления набора.
Фаза 1: Идентификация местной линии
Начнем с разделения карты на секторы, каждый из которых будет обрабатываться как отдельное задание. Несколько заданий могут обрабатываться в разных потоках, но каждое задание будет обрабатываться только одним потоком. Если сегмент гребня пересекает сектор, он разделяется на два сегмента, по одному на каждый сектор.
Для каждого сектора устанавливается массив, отображающий позицию пикселя в структуру непересекающегося множества. Большая часть этого массива будет позже отброшена, поэтому его требования к памяти не должны быть слишком обременительны.
Затем мы переходим к каждому сегменту линии в секторе. Сначала мы выбираем непересекающееся множество, представляющее всю прямую, частью которой является отрезок. Сначала мы просматриваем каждую конечную точку в массиве позиций пикселей, чтобы увидеть, не была ли уже назначена непересекающаяся структура множества. Если одна из конечных точек уже находится в этом массиве, мы используем назначенный непересекающийся набор. Если оба находятся в массиве, мы выполняем слияние непересекающихся наборов и используем новый корень в качестве нашего набора. В противном случае мы создаем новое непересекающееся множество и связываем со структурой непересекающегося множества ссылку на текущий отрезок прямой. Затем мы записываем обратно в массив позиций пикселей наш новый корень непересекающегося множества для каждой из наших конечных точек.
Этот процесс повторяется для каждого сегмента линии в секторе; к концу мы полностью отождествим все линии внутри сектора с помощью непересекающегося множества.
Обратите внимание, что, поскольку непересекающиеся наборы еще не распределяются между потоками, пока нет необходимости использовать операции сравнения и замены; просто используйте обычный однопоточный алгоритм объединения-слияния. Поскольку мы не освобождаем ни одну из структур непересекающихся наборов, пока алгоритм не завершится, выделение также может быть выполнено с помощью распределителя выпуклости для каждого потока, что делает выделение памяти (фактически) без блокировки и O (1).
Как только сектор полностью обработан, все данные в массиве позиций пикселей отбрасываются; однако данные, соответствующие пикселям на краю сектора, копируются в новый массив и сохраняются для следующего этапа.
Поскольку итерация по всему изображению — это O(x*y), а непересекающееся слияние эффективно — O(1), эта операция — O(x*y) и требует рабочей памяти O(m+2*x*y/k+ k^2) = O(x*y/k+k^2), где t — количество секторов, k — ширина сектора, а m — количество неполных отрезков в секторе (в зависимости от того, как часто линии пересекают границы, m может значительно варьироваться, но никогда не будет превышать количество отрезков линии). Память, переносимая на следующую операцию, равна O(m + 2*x*y/k) = O(x*y/k)
Фаза 2: Межсекторальные слияния
После обработки всех секторов мы переходим к объединению линий, пересекающих сектора. Для каждой границы между секторами мы выполняем операции слияния без блокировки на линиях, пересекающих границу (т. е. там, где соседние пиксели с каждой стороны границы назначены наборам линий).
Эта операция имеет время выполнения O(x+y) и потребляет O(1) памяти (однако мы должны сохранить память из фазы 1). По завершении массивы ребер могут быть отброшены.
Фаза 3: Сбор линий
Теперь мы выполняем многопоточную операцию сопоставления всех выделенных объектов структуры непересекающихся множеств. Сначала мы пропускаем любой объект, который не является корнем (т.е. где obj.parent != obj). Затем, начиная с репрезентативного сегмента линии, мы удаляемся оттуда и собираем и записываем любую информацию, необходимую для рассматриваемой линии. Мы уверены, что только один поток просматривает любую заданную строку в каждый момент времени, поскольку пересекающиеся строки оказались бы в одной и той же структуре непересекающихся множеств.
Это имеет время выполнения O(m) и использование памяти в зависимости от того, какую информацию вам нужно собрать об этих сегментах линии.
Резюме
В целом, этот алгоритм должен иметь время работы O(x*y) и использование памяти O(x*y/k + k^2). Корректировка k дает компромисс между временным использованием памяти в процессах фазы 1 и долговременным использованием памяти для массивов смежности и структур непересекающихся множеств, переносимых в фазу 2.
Обратите внимание, что я фактически не проверял производительность этого алгоритма в реальном мире; также возможно, что я упустил из виду проблемы параллелизма в приведенном выше алгоритме объединения-слияния непересекающихся множеств без блокировки. Комментарии приветствуются :)
person
bdonlan
schedule
30.01.2011