Как я могу запустить простую операцию параллельного присваивания массива в Julia?

Мне приходится решать систему дифференциальных уравнений много раз, перебирая параметр. Для этого я запускаю цикл по списку параметров и сохраняю решение (оцененное в массиве значений времени) для каждого параметра. Итак, у меня есть 2D-массив, в котором я храню решения (каждая строка предназначена для значения параметра).

Теперь, поскольку одна итерация не имеет ничего общего с другой, я подумал сделать это параллельно.

Вот мой код:

using DifferentialEquations
using SharedArrays
using DelimitedFiles
using Distributed

function tf(x,w)
    return x*sin(w*x)
end

function sys!(dv,v,w,t)
    dv[1] = w*v[1]
    dv[2] = tf(v[1],w)
end

times = LinRange(0.1,2,25)

params = LinRange(0.1,1.2,100)

sols = SharedArray{Float64,2}((length(times),length(params)))

@distributed for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

writedlm("output.txt",sols)

Теперь, когда я запускаю это без префикса @distributed к циклу, он работает отлично.

Однако, когда я запускаю этот код, оператор println не работает, и хотя файл output.txt сохраняется, он полон нулей.

Я запускаю этот код из командной строки таким образом

julia -p 4 trycode.jl

Это не показывает никаких результатов, просто работает в течение минуты и ничего не делает, хотя файл "output.txt" сохраняется. Как будто в цикл никогда не заходят.

Я был бы очень признателен за помощь в настройке этого простого параллельного цикла.


comment
Какую версию Юлии вы используете?   -  person Oscar Smith    schedule 24.04.2020
comment
@OscarSmith 1.3.1   -  person hg153    schedule 24.04.2020


Ответы (2)


Как говорит Билл, есть два основных способа думать о параллелизме в Julia: поточная модель, которая была представлена ​​в Julia 1.3 и выполняет параллелизм с разделяемой памятью с помощью макроса Threads.@threads, и распределенная обработка с использованием макроса Distributed.@distributed, который распараллеливает различные процессы Julia.

Threads определенно ближе к "автоматическому" ускорению параллелизма с минимальной перезаписью кода или без нее и часто является отличным вариантом, хотя нужно позаботиться о том, чтобы любая выполняемая операция была потокобезопасной, поэтому всегда проверяйте, что результаты такие же.

Поскольку изначально ваш вопрос касался @distributed параллелизма, позвольте мне ответить и на него. Если вы используете @distributed параллелизм, простейшая ментальная модель (я считаю) для размышления о том, что происходит, - это представить, что вы запускаете свой код в полностью отдельных репликах Julia.

Вот версия вашего кода, адаптированная к модели @distributed:

using Distributed
addprocs(2)

using SharedArrays
using DelimitedFiles

@everywhere begin 
    using DifferentialEquations

    tf(x,w) = x*sin(w*x)

    function sys!(dv,v,w,t)
        dv[1] = w*v[1]
        dv[2] = tf(v[1],w)
    end

    times = LinRange(0.1,2,25)
    params = LinRange(0.1,1.2,100)
end

sols = SharedArray{Float64,2}((length(times),length(params)))

@sync @distributed for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

sols

Что изменилось?

  • Я добавил addprocs(2) в начале скрипта. В этом нет необходимости, если вы запускаете Julia с p -2 (или с любым другим количеством процессов, которое хотите), но мне часто легче рассуждать о коде, когда он явно устанавливает параллельную среду в коде напрямую. Обратите внимание, что в настоящее время это невозможно для потоков, т.е. вам нужно установить переменную среды JULIA_NUM_THREADS перед запуском Julia, и вы не можете изменить количество потоков после запуска.

  • Затем я переместил биты кода в блок @everywhere begin ... end. По сути, это запускает операции, заключенные в блоке, для всех процессов одновременно. Возвращаясь к ментальной модели запуска отдельных экземпляров Julia, вы должны посмотреть, что находится в вашем @distributed цикле, и убедиться, что все функции и переменные действительно определены для всех процессов. Так, например, чтобы убедиться, что каждый процесс знает, что такое ODEProblem, вам нужно сделать using DifferentialEquations для всех из них.

  • Наконец, я добавил @sync в распределенный цикл. На это есть ссылка в документации для @distributed. Запуск макроса @distributed с for циклом порождает дескриптор асинхронного зеленого потока (Task) для распределенного выполнения и переходит к следующей строке. Поскольку вы хотите дождаться завершения выполнения, требуется @sync синхронизация. Проблема с вашим исходным кодом заключается в том, что, не дожидаясь завершения зеленого потока (синхронизации), он проглатывает ошибки и сразу же возвращается, поэтому ваш массив sol пуст. Вы можете увидеть это, если запустите свой исходный код и добавите только @sync - тогда вы получите TaskFailedException: on worker 2 - UndefVarError: #sys! not defined, который сообщает вам, что ваши рабочие процессы не знают о функциях, которые вы определили в главном процессе. На практике вам почти всегда нужно @sync выполнение, если только вы не планируете запускать много таких распределенных циклов параллельно. Вам также не нужно ключевое слово @sync, если вы используете функцию агрегатора в распределенном цикле (форма @distributed (func) for i in 1:1000 цикла).

Какое здесь лучшее решение? Ответ: не знаю. @threads - отличный вариант для быстрого распараллеливания поточно-безопасных операций без переписывания кода, он все еще активно разрабатывается и совершенствуется, так что в будущем он, вероятно, станет еще лучше. В стандартной распределенной библиотеке также есть pmap, который дает у вас есть дополнительные варианты, но этот ответ и так достаточно длинный! По моему личному опыту, ничто не заменяет (1) размышление о вашей проблеме и (2) выполнение тестов. Вещи, о которых вы хотите подумать, - это время выполнения вашей проблемы (как общее, так и для каждой отдельной операции, которую вы хотите распространить) и требования к передаче сообщений / доступу к памяти.

Положительным моментом является то, что, хотя вам, возможно, придется потратить немного усилий на обдумывание вещей, у Джулии есть множество отличных вариантов, чтобы максимально использовать любую аппаратную ситуацию с паршивым старым ноутбуком с двумя ядрами (например, тот, который я печатаю. this from) в многоузловые сверхвысокопроизводительные кластеры (что сделало Julia одним из немногих языков программирования, использующим достичь производительности в петафлопе - хотя, честно говоря, это немного сложнее, чем мой ответ или ответ Билла :))

person Nils Gudat    schedule 24.04.2020
comment
Спасибо! Это сработало отлично. Ваш ответ действительно помог понять, что здесь происходит. У меня есть дополнительный вопрос: что касается моей реальной проблемы, я знаю, что последние 50 значений параметра имеют большое время вычисления, и именно они удерживают код. Есть ли способ элегантно указать, как именно процессы разделяют элементы цикла между собой? т.е. для 4 процессов процесс 1 должен получить i = 1,5,9 ... и так далее. Таким образом, нагрузка равномерно распределяется между 4 процессами. Есть ли способ сделать это программно? - person hg153; 25.04.2020
comment
Так что я не совсем уверен, но я думаю, что @sync @distributed уже делает это - т.е. любому процессору выделяется следующая задача всякий раз, когда они выполняются, и если одна занимает больше времени, чем другая, она должна получить несколько. Попробуйте следующее: using Distributed; addprocs(2); @sync @distributed for s in [1,2,3,4,10,1,1,1]; println("Waiting for $s seconds"); sleep(s); end - это должно дать вам представление. Также посмотрите pmap документы, они принимают batch_size аргумент, который дает вам больше контроля над разделением вычислений, я думаю. - person Nils Gudat; 25.04.2020
comment
К сожалению, @distributed распределяет все задачи с одинаковым количеством вычислений. Следовательно, если у вас несбалансированное время вычислений, это может быть не лучшим вариантом (если у вас нет большого количества рабочих мест и у вас есть хорошие шансы, что это уравновесится). - person Przemyslaw Szufel; 25.04.2020
comment
Что касается того, что лучше Thread или @distributed - на практике потоки Julia плохо масштабируются, когда их слишком много. Следовательно, если вы используете 2 или 4 ядра, у вас все в порядке с обоими потоками и @distributed, если это 64 ядра, вам определенно нужно @distributed - person Przemyslaw Szufel; 25.04.2020

Можете ли вы получить выгоду от использования потоков для @distributed for? Это работает (Юля 1.4):

using DifferentialEquations
using SharedArrays
using DelimitedFiles
using Distributed

function tf(x,w)
    return x*sin(w*x)
end

function sys!(dv,v,w,t)
    dv[1] = w*v[1]
    dv[2] = tf(v[1],w)
end

times = LinRange(0.1,2,25)

params = LinRange(0.1,1.2,100)

sols = SharedArray{Float64,2}((length(times),length(params)))

@Threads.threads for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

writedlm("output.txt",sols)
person Bill    schedule 24.04.2020
comment
Спасибо! Это хорошо работает. Думаю, у меня есть дополнительный вопрос. Есть ли способ сообщить потокам, как распределять цикл? Насколько я понимаю, это разбивает цикл на равные части, но в моей реальной проблеме большие значения параметров (все хранящиеся в конце массива) занимают больше времени, поэтому я бы хотел, чтобы распределение было разделено равномерно, то есть с 4 процессами, первый поток должен получить элементы {1,5,9} и так далее. Есть ли способ сделать это? - person hg153; 24.04.2020
comment
Кроме того, когда я запускаю это из командной строки, он использует только один поток, несмотря на то, что я добавляю addprocs (4) в код и использую julia -p 4 во время выполнения. - person hg153; 24.04.2020
comment
Возможно, вам потребуется установить количество потоков для ЦП с помощью параметра JULIA_NUM_THREADS в среде перед запуском Julia. - person Bill; 24.04.2020
comment
@ hg153 То, что вы просите, обычно называется балансировкой нагрузки, и, как вы заметили, Threads.@threads этого не делает. Вместо Threads.@threads вы можете использовать Threads.@spawn индивидуально в цикле, чтобы запускать вычисления в разных потоках. Это уравновесит нагрузку. - person crstnbr; 24.04.2020