R parLapply не параллельно

В настоящее время я разрабатываю пакет R, который будет использовать параллельные вычисления для решения некоторых задач с помощью «параллельного» пакета.

Я получаю действительно неудобное поведение при использовании кластеров, определенных внутри функций моего пакета, где функция parLapply назначает задание работнику и ожидает его завершения, чтобы назначить задание следующему исполнителю. Или, по крайней мере, это то, что кажется происходящим при просмотре файла журнала «cluster.log» и списка запущенных процессов в оболочке unix.

Ниже представлен макет исходной функции, объявленной внутри моего пакета:

.parSolver <- function( varMatrix, var1 ) {

    no_cores <- detectCores()

    #Rows in varMatrix
    rows <- 1:nrow(varMatrix[,])

    # Split rows in n parts
    n <- no_cores
    parts <- split(rows, cut(rows, n))

    # Initiate cluster
    cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
    clusterEvalQ(cl, library(raster))
    clusterExport(cl, "varMatrix", envir=environment())
    clusterExport(cl, "var1", envir=environment())


    rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
        part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
        print(x)
        return(part)
        })

    do.call(merge, rParts)
}

ПРИМЕЧАНИЯ:

  • Я использую makePSOCKcluster, потому что хочу, чтобы код запускался как в системах Windows, так и в системах Unix, хотя эта конкретная проблема проявляется только в системе unix.
  • Функции растеризации и растра определены в библиотеке (растре), экспортируемой в кластер.

Для меня странно то, что если я выполняю один и тот же код функции parSolver в глобальной среде, все работает плавно, все рабочие берут на себя одну работу одновременно, и задача выполняется в кратчайшие сроки. Однако, если я сделаю что-то вроде:

library(myPackage)

varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)

появляется описанная проблема.

Это, похоже, проблема с балансировкой нагрузки, однако это не объясняет, почему она работает нормально в одной ситуации, а не в другой.

Я что-то упустил? Заранее спасибо.


person 1cgmr    schedule 07.04.2017    source источник
comment
Обычно, когда я использую параллельный кластер в функции, я использую его для объявления кластера в глобальной среде. И после этого у меня никогда не было проблем с использованием parLapply. Не могу сказать, что это решит вашу проблему, но вы можете попробовать. Инициируйте свой кластер вне функции и добавьте параметр cl в свою parSolver(varMatrix, var1, cl) функцию и используйте n <- length(cl) внутри своей функции.   -  person Sébastien Rochette    schedule 07.04.2017
comment
Я уже пробовал этот подход, но, к сожалению, проблема остается ...   -  person 1cgmr    schedule 07.04.2017


Ответы (1)


Я не думаю, что parLapply выполняется последовательно. Скорее всего, он просто работает неэффективно, из-за чего кажется, что он работает последовательно.

У меня есть несколько предложений по его улучшению:

  • Не определяйте рабочую функцию внутри parSolver
  • Не экспортируйте все varMatrix каждому рабочему
  • Создайте кластер вне parSolver

Первый момент важен, потому что в вашем примере все переменные, определенные в parSolver, будут сериализованы вместе с анонимной функцией worker и отправлены parLapply рабочим. Определив рабочую функцию вне какой-либо функции, сериализация не будет захватывать нежелательные переменные.

Второй пункт позволяет избежать ненужного ввода-вывода сокетов и использовать меньше памяти, что делает код более масштабируемым.

Вот фальшивый, но самодостаточный пример, похожий на ваш, демонстрирующий мои предложения:

# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
    library(raster)
    mat * var1
}

parSolver <- function(cl, varMatrix, var1) {
    parts <- splitIndices(nrow(varMatrix), length(cl))
    varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
    rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
    do.call(rbind, rParts)
}

library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)

Обратите внимание, что при этом используется функция clusterApply для итерации по списку блоков строк из varMatrix, так что всю матрицу не нужно отправлять всем. Он также избегает вызовов clusterEvalQ и clusterExport, упрощая код и делая его немного более эффективным.

person Steve Weston    schedule 01.08.2017