В то время как MPI имеет гарантию порядка в том смысле, что два сообщения с рангом 1 до ранга 0 будут получены рангом 0 в том порядке, в котором они будут отправлены, — одно сообщение не может догнать другое — MPI ничего не говорит и не может ничего сказать о как они будут чередоваться с другими сообщениями от других процессоров. Таким образом, вы можете легко получить такие ситуации, как:
rank 1 messages to rank 0: [src 1, msg A, tag 1], [src 1, msg B, tag 2]
rank 2 messages to rank 0: [src 2, msg C, tag 1], [src 2, msg D, tag 2]
rank 0 message queue: [src 1, msg A, tag 1], [src 2, msg C, tag 1], [src 2, msg D, tag 2], [src 1, msg B, tag 2]
Таким образом, ранг 0, извлекающий сообщение с тегом 1, получит сообщение A ранга 1, но затем с тегом 2 получит сообщение D ранга 2. (Обратите внимание, что приведенная выше очередь сообщений удовлетворяет приведенной выше гарантии порядка, но не помогает нам здесь. ).
Есть несколько способов обойти это. Один из них — фильтровать сообщения, полученные для newdeltaw
, не только по тегу, но и по источнику, чтобы убедиться, что это та же задача, которая отправила neww
:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
neww = comm.recv(source=MPI.ANY_SOURCE, tag=1, status=rstat)
src = rstat.Get_source()
newdeltaw = comm.recv(source=src, tag=2)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send(rank,dest=0,tag=1)
comm.send(data,dest=0,tag=2)
Таким образом, будет получено только сообщение newdeltaw тега 2 из соответствующего источника, что позволит избежать несогласованности.
Другой подход заключается в том, чтобы вообще не разделять сообщения, помещая обе части данных в одно и то же сообщение:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
(neww,newdeltaw) = comm.recv(source=MPI.ANY_SOURCE, tag=1)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send((rank,data),dest=0,tag=1)
Это объединяет обе части данных в одно сообщение, поэтому их нельзя разделить. (Обратите внимание, что как только это заработает, вы можете использовать более эффективные подпрограммы mpi4py более низкого уровня, чтобы избежать сериализации кортежей:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
dataarr = numpy.zeros(2,dtype='i')
comm.Recv([dataarr,MPI.INT],source=MPI.ANY_SOURCE, tag=1)
newdeltaw = dataarr[0]
neww = dataarr[1]
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
senddata = numpy.array([rank,data],dtype='i')
comm.Send([senddata, MPI.INT],dest=0,tag=1)
Наконец, вы можете полностью избежать подхода «главный/подчиненный» и заставить все процессоры работать над своими частичными результатами в задаче, а затем объединить все результаты в конце с помощью операции сокращения:
cb = numpy.zeros(size,dtype='i')
totals = numpy.zeros(size,dtype='i')
data = rank
for i in range(3):
cb[rank] = cb[rank] + data
comm.Reduce([cb,MPI.INT], [totals,MPI.INT], op=MPI.SUM, root=0)
if rank == 0:
print "result is", totals
person
Jonathan Dursi
schedule
15.05.2014
cb[neww]=cb[neww]+newdeltaw
данные дляneww
иnewdeltaw
поступают из другого процесса. Код дляsend
находится в основной программе, а не вif conditional
, как данный код - person user567879   schedule 14.05.2014newdelta
получено от другого процесса? Может ли случиться такой случай? - person user567879   schedule 14.05.2014Get_source
, чтобы узнать источник, из которого вы только что получили, и получитеnewdeltaw
из этого источника вместоANY_SOURCE
. В качестве альтернативы просто упакуйте neww и newdeltaw в одно сообщение. Ни один из них не включает блокировку. - person Jonathan Dursi   schedule 14.05.2014