Обеспечение того, чтобы два сообщения исходили от одной и той же задачи MPI.

Я пишу программирование MPI, используя python (mpi4py). Многие процессы вычисляют частичные результаты и отправляют как индекс, так и обновление главной задаче. Мой код, который собирает все данные, задается как

if rank == 0:
    cb = dict((v,0) for v in graph)
    #print "initial is",cb
    while True: 
        neww = comm.recv(source=ANY_SOURCE, tag=1) 
        newdeltaw = comm.recv(source=ANY_SOURCE, tag=2)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

Но здесь есть состояние гонки, которое влияет на мои результаты для большого количества процессоров - у меня может быть ситуация, в которой cb[neww]=cb[neww]+newdeltaw данные для news и newdeltaw поступают из разных процессов. Как предотвратить это?


person user567879    schedule 14.05.2014    source источник
comment
Непонятно, что вы здесь спрашиваете. Что касается больших входных данных, как ваша программа не работает? Что заставляет вас думать, что вам нужно поставить замок на что-то?   -  person Jonathan Dursi    schedule 14.05.2014
comment
Есть ли у меня ситуация, в которой cb[neww]=cb[neww]+newdeltaw данные для neww и newdeltaw поступают из другого процесса. Код для send находится в основной программе, а не в if conditional, как данный код   -  person user567879    schedule 14.05.2014
comment
Я до сих пор не понимаю, зачем нужен замок. Ранг 0 получит neww от какого-то процессора, а затем newdeltaw от какого-то (возможно, другого) процессора и соответствующим образом обновит cb[neww]. Здесь ничего не нужно запирать. Чего вы пытаетесь достичь, и что не работает? Пожалуйста, предоставьте простой фрагмент кода, который воспроизводит проблему, с которой вы столкнулись.   -  person Jonathan Dursi    schedule 14.05.2014
comment
Что происходит, когда 'neww' получено от одного процесса, а newdelta получено от другого процесса? Может ли случиться такой случай?   -  person user567879    schedule 14.05.2014
comment
Да, это может случиться. Если вы не хотите, чтобы это произошло, передайте объект статуса для comm.recv, используйте Get_source, чтобы узнать источник, из которого вы только что получили, и получите newdeltaw из этого источника вместо ANY_SOURCE. В качестве альтернативы просто упакуйте neww и newdeltaw в одно сообщение. Ни один из них не включает блокировку.   -  person Jonathan Dursi    schedule 14.05.2014
comment
Попробовал второе предложение. Работал отлично. Спас мой день. :)   -  person user567879    schedule 15.05.2014


Ответы (1)


В то время как MPI имеет гарантию порядка в том смысле, что два сообщения с рангом 1 до ранга 0 будут получены рангом 0 в том порядке, в котором они будут отправлены, — одно сообщение не может догнать другое — MPI ничего не говорит и не может ничего сказать о как они будут чередоваться с другими сообщениями от других процессоров. Таким образом, вы можете легко получить такие ситуации, как:

  rank 1 messages to rank 0: [src 1, msg A, tag 1], [src 1, msg B, tag 2]  
  rank 2 messages to rank 0: [src 2, msg C, tag 1], [src 2, msg D, tag 2]

  rank 0 message queue: [src 1, msg A, tag 1], [src 2, msg C, tag 1], [src 2, msg D, tag 2], [src 1, msg B, tag 2] 

Таким образом, ранг 0, извлекающий сообщение с тегом 1, получит сообщение A ранга 1, но затем с тегом 2 получит сообщение D ранга 2. (Обратите внимание, что приведенная выше очередь сообщений удовлетворяет приведенной выше гарантии порядка, но не помогает нам здесь. ).

Есть несколько способов обойти это. Один из них — фильтровать сообщения, полученные для newdeltaw, не только по тегу, но и по источнику, чтобы убедиться, что это та же задача, которая отправила neww:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        neww = comm.recv(source=MPI.ANY_SOURCE, tag=1, status=rstat)
        src = rstat.Get_source()
        newdeltaw = comm.recv(source=src, tag=2)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb
else:
    data = rank
    for i in range(3):
        comm.send(rank,dest=0,tag=1)
        comm.send(data,dest=0,tag=2)

Таким образом, будет получено только сообщение newdeltaw тега 2 из соответствующего источника, что позволит избежать несогласованности.

Другой подход заключается в том, чтобы вообще не разделять сообщения, помещая обе части данных в одно и то же сообщение:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        (neww,newdeltaw) = comm.recv(source=MPI.ANY_SOURCE, tag=1)
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

else:
    data = rank
    for i in range(3):
        comm.send((rank,data),dest=0,tag=1)

Это объединяет обе части данных в одно сообщение, поэтому их нельзя разделить. (Обратите внимание, что как только это заработает, вы можете использовать более эффективные подпрограммы mpi4py более низкого уровня, чтобы избежать сериализации кортежей:

if rank == 0:
    cb = numpy.zeros(size)
    rstat = MPI.Status()
    for i in range((size-1)*3):
        dataarr = numpy.zeros(2,dtype='i')
        comm.Recv([dataarr,MPI.INT],source=MPI.ANY_SOURCE, tag=1)
        newdeltaw = dataarr[0]
        neww = dataarr[1]
        print "newdelw is",newdeltaw,"neww is",neww
        cb[neww]=cb[neww]+newdeltaw
        print "cb=",cb

else:
    data = rank
    for i in range(3):
        senddata = numpy.array([rank,data],dtype='i')
        comm.Send([senddata, MPI.INT],dest=0,tag=1)

Наконец, вы можете полностью избежать подхода «главный/подчиненный» и заставить все процессоры работать над своими частичными результатами в задаче, а затем объединить все результаты в конце с помощью операции сокращения:

cb = numpy.zeros(size,dtype='i')
totals = numpy.zeros(size,dtype='i')

data = rank
for i in range(3):
    cb[rank] = cb[rank] + data

comm.Reduce([cb,MPI.INT], [totals,MPI.INT], op=MPI.SUM, root=0)

if rank == 0:
    print "result is", totals
person Jonathan Dursi    schedule 15.05.2014