Как собрать массивы разной длины с помощью mpi4py

Желаемое поведение:

Я пытаюсь взять несколько списков разной длины на разных узлах, собрать их вместе в одном узле и сделать так, чтобы этот главный узел поместил их в набор. Этот список называется rout_array в каждом узле. Обратите внимание, что элементы в rout_array являются только целыми числами и не уникальны для узлов.

Ошибка:

Traceback (most recent call last):
  File "prout.py", line 160, in <module>
    main()
  File "prout.py", line 153, in main
    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
  File "prout.py", line 129, in DetermineRoutingNumber
    comm.Gather(send_buffer, recv_buffer, root = 0)

  File "MPI\Comm.pyx", line 589, in mpi4py.MPI.Comm.Gather (c:\projects\mpi4py\src\mpi4py.MPI.c:97806)
  File "MPI\msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (c:\projects\mpi4py\src\mpi4py.MPI.c:34678)
  File "MPI\msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (c:\projects\mpi4py\src\mpi4py.MPI.c:33938)
  File "MPI\msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (c:\projects\mpi4py\src\mpi4py.MPI.c:30349)
  File "MPI\msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (c:\projects\mpi4py\src\mpi4py.MPI.c:29448)

  KeyError: 'O'

Я понятия не имею, как я получаю KeyError для 'O', когда в моем коде нет строк. Все списки содержат целые числа, массивы numpy содержат целые числа, а единственный активный здесь словарь содержит только целые числа для ключей. Следует отметить, что каждый узел выдает эту ошибку.

Код:

import numpy, math
from mpi4py import MPI
from sympy.combinatorics import Permutation as Perm     

def GetEdges(size,file):
    """This function takes in a file of edges in a graph in the form 'u,v'
    without quotes, where u and v are vertices of the graph. It then
    generates a permutation that swaps those vertices, and returns these
    transpositions."""

    edgeFile = open(file, "r")
    edges = []
    for line in edgeFile:
        line = line.strip()
        line = line.split(",")
        for vertex in line:
            line[line.index(vertex)] = int(vertex)
        edges.append(Perm([line], size = size))

    edgeFile.close()
    edges.append(Perm([[size - 1]], size = size))

    return edges


def AreDisjoint(p1,p2):
    """This function determines whether or not two permutations move any
    common elements, and returns the appropriate boolean."""
    v1 = set(p1.support())
    v2 = set(p2.support())

    return len(v1 & v2) == 0


def GetMatchings(edges, maxMatching, size):
    """This function takes in a set of edges given by GetEdges(), and 
    generates all possible matchings in the given graph. It then converts
    each matching into its rank given by lexicographical order, and appends
    that rank to a set, which is then returned."""

    stepDict = {1:set(edges)}
    steps = set(edges)
    for i in range(1,maxMatching):
        temp = set()
        for p1 in stepDict[1]:
            for p2 in stepDict[i]:
                newPerm = p1 * p2
                if AreDisjoint(p1,p2) and newPerm not in steps:
                    temp.add(newPerm)
                    steps.add(newPerm)

        stepDict[i+1] = temp

    newSteps = set()
    for step in steps:
        newSteps.add(step.rank())
    return newSteps


def FromRank(rank,level):
    """This function takes in a rank and size of a permutation, then returns
    the permutation that lies at the rank according to lexicographical 
    ordering. """

    lst = list(range(level + 1))
    perm = []
    while lst:
        fact = math.factorial(len(lst) - 1)
        index, rank = divmod(rank, fact)
        perm.append(lst.pop(index))
    assert rank == 0 
    return perm


def SplitArrayBetweenNodes(rank, rem, length):
    """This function takes in the rank of a node and any remainder after
    dividing up an array between all the nodes. It then returns a starting
    and ending partition index unique to each node."""
    if rem != 0:
        if rank in list(range(rem)):
            if rank == 0:
                part_start = 0
                part_end = length
            else:
                part_start = rank * (length + 1)
                part_end = part_start + length
        else:
            part_start = rank * length + rem
            part_end = part_start + length - 1
    else:
        part_start = rank * length
        part_end = part_start + length - 1

    return part_start, part_end


def DetermineRoutingNumber(steps, goal, vertexSetSize):
    """This function takes in the matchings created by GetMatchings(), 
    and calculates all possible products between its own elements. It then
    takes all unique products, and calculates all possible prducts between
    the matching set and the previous output. This repeats until all 
    permutations of a given type are found. The level at which this occurs
    is then returned."""

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    length = len(steps)
    rem = length % size
    part_len = length // size

    part_start, part_end = SplitArrayBetweenNodes(rank,rem, part_len)

    permDict = {1: steps}
    i = 1
    while True:
        rout_array = set()
        work_array = set(list(permDict[i])[part_start:part_end + 1])

        #Calculate all possible products    
        for p1 in permDict[1]:
            for p2 in work_array:
                p2_perm = Perm(FromRank(p2,vertexSetSize - 1))
                p1_perm = Perm(FromRank(p1,vertexSetSize - 1))
                new = p2_perm * p1_perm

                if new(0) == 0 or new(0) == 1:
                    order = new.rank()
                    rout_array.add(order)

        #All nodes send their work to master node
        comm.Barrier()

        send_buffer = numpy.array(rout_array)
        sendcounts = numpy.array(comm.gather(len(rout_array), root = 0))

        if rank == 0:
            recv_buffer = numpy.empty(sum(sendcounts), dtype = int)
        else:
            recv_buffer = None

        comm.Gatherv(sendbuf = send_buffer, recvbuf = (recv_buffer, sendcounts), root = 0) 

        #Generate input for next level of the loop, and weed out repeats.
        permDict[i+1] = rout_array
        for j in range(1,i+1):
            permDict[i+1] = permDict[i+1] - permDict[j]


def main():
    file = "EdgesQ2.txt"
    maxMatching = 2
    vertexSetSize = 4

    edges = GetEdges(vertexSetSize, file)
    steps = GetMatchings(edges, maxMatching, vertexSetSize)
    goal = 2 * math.factorial(vertexSetSize-1)

    num = DetermineRoutingNumber(steps, goal, vertexSetSize)
    print(num)


main()

Тестовые случаи:

EdgesQ2.txt:

Обратите внимание, что в этом примере maxMatching = 2 и vertexSetSize = 4. Вывод должен быть 3.

0,1
1,2
2,3
0,3

EdgesQ3.txt:

Обратите внимание, что в этом примере maxMatching = 4 и vertexSetSize = 8. Вывод должен быть 4.

0,1
0,3
0,4
1,2
1,5
2,3
2,6
3,7
4,5
4,7
5,6
6,7

person Santana Afton    schedule 23.06.2016    source источник
comment
Добро пожаловать в СО. Я взял на себя смелость ответить на вашу основную проблему, а не на KeyError. Имейте в виду, что, как правило, необходимо опубликовать минимально воспроизводимый пример, чтобы получить помощь по отладке. Ваш код близок, но rout_array, permDict, part_len отсутствуют, чтобы воспроизвести проблему.   -  person Zulan    schedule 24.06.2016


Ответы (1)


Если ваши длины различаются в разных процессах, вам нужно использовать вариант вектора Gatherv. С помощью этой функции вы предоставляете массив, содержащий различную длину (recvcounts).

К сожалению, документация mpi4py в настоящее время не описывает, как использовать Gatherv или любые другие варианты векторов. Вот простой пример:

#!/usr/bin/env python3

import numpy as np
from mpi4py import MPI
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
root = 0

local_array = [rank] * random.randint(2, 5)
print("rank: {}, local_array: {}".format(rank, local_array))

sendbuf = np.array(local_array)

# Collect local array sizes using the high-level mpi4py gather
sendcounts = np.array(comm.gather(len(sendbuf), root))

if rank == root:
    print("sendcounts: {}, total: {}".format(sendcounts, sum(sendcounts)))
    recvbuf = np.empty(sum(sendcounts), dtype=int)
else:
    recvbuf = None

comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)
if rank == root:
    print("Gathered array: {}".format(recvbuf))

Как видите, mpi4py не принимает количество отправленных или полученных сообщений в качестве дополнительных параметров, а принимает их как кортеж/список из параметра recvbuf. Если вы передадите (recvbuf, sendcounts), он получит тип из recvbuf. Смещения/смещения будут сделаны таким образом, что данные всех рангов будут храниться непрерывно и упорядочены по рангам.

По сути, mpi4py угадывает, что вы могли иметь в виду с различными формами параметра recvbuf. Полная и недвусмысленная форма (buffer, counts, displacements, type).

Изменить относительно KeyError:

Довольно запутанное имя rout_array — это set, которое не является допустимым входом для numpy.array. set не является ни последовательностью, ни интерфейсом массива. К сожалению, вместо сбоя numpy.array создает очень странный объект ndarray без размеров. Вы можете обернуть создание массива в список:

send_buffer = numpy.array(list(rout_array))

Коллектив работает, но цикл не завершается, что неудивительно, учитывая, что в цикле while true в цикле DetermineRoutingNumber нет ни return, ни break.

person Zulan    schedule 24.06.2016
comment
Я пошел дальше и внедрил .Gatherv() в свой код, и сначала получил ошибку len() of unsized object, которая указывала на len(sendbuf). К сожалению, после изменения этого на len(rout_array) теперь я получаю KeyError: 'O'. - person Santana Afton; 24.06.2016
comment
Хорошо, я максимально урезал код, чтобы дать рабочий пример, и добавил два тестовых случая. Позвольте мне знать, если вам нужно что-нибудь еще. - person Santana Afton; 24.06.2016