Я использую gpars для параллельной обработки 250-мегапиксельной таблицы базы данных MySQL. Я создаю 8 потоков gpars, 8 независимых подключений к базе данных и разделяю данные таким образом, чтобы каждый поток работал независимо с разными диапазонами строк... что-то вроде дешевой концепции MapReduce. В основе логика такая:
withExistingPool(pool)
{
connection_array.collectParallel()
{
// Figure out which connection this thread can use.
// We use the index into the array to figure out
// which thread we are, and this tells us where to
// read data.
int i
for (i = 0; i < connection_array.size(); i++)
if (it == connection_array[i])
break
// Each thread runs the same query, with LIMIT controlling
// the position of rows it will read...if we have 8 threads
// reading 40000 rows per call to this routine, each thread
// reads 5000 rows (thread-0 reads rows 0-4999, thread-1 reads
// 5000-9999 and so forth).
def startrow = lastrow + (i * MAX_ROWS)
def rows = it.rows( "SELECT * ... LIMIT ($startrow, $MAX_ROWS)")
// Add our rows to the result set we will return to the caller
// (needs to be serialized since many threads can be here)
lock.lock()
if (!result)
result = rows
else
result += rows
lock.unlock()
}
}
Сначала код работает отлично, давая мне более 10 000 строк в секунду при запуске. Но после нескольких миллионов строк он начинает тормозить. К тому времени, когда мы получаем 25 миллионов строк, вместо 10 000 строк в секунду мы получаем только 1000 строк в секунду. Если мы завершим приложение и перезапустим его с того места, где мы остановились, оно снова вернется к 10 000 строк в секунду на некоторое время, но оно всегда замедляется по мере продолжения обработки.
Доступной вычислительной мощности достаточно — это 8-процессорная система, а база данных находится в сети, так что время ожидания остается приличным, несмотря ни на что. Во время работы процессоры обычно загружены не более чем на 25-30%. Утечек памяти также не наблюдается — мы отслеживаем статистику памяти и не видим никаких изменений во время обработки. Сервер MySQL, похоже, не испытывает стресса (первоначально он загружен примерно на 30%, уменьшаясь по мере замедления работы приложения).
Есть ли какие-нибудь хитрости, которые помогут таким вещам работать более последовательно с большим количеством итераций?
(0..<connection_array.size()).collectParallel
вместо вашего массива, тогда вам не нужно будет искать текущий индекс - person tim_yates   schedule 07.04.2016results = connection_array.collectParallel()
и избавьтесь от своего последнего блока if... Или попробуйте сохранить его и изменить наeachParallel
- person tim_yates   schedule 07.04.2016