Могу ли я связать два сообщения MPI?

Я пытаюсь установить общение "все-к-одному" вне очереди. В основном у меня есть несколько массивов с плавающей запятой одинакового размера, идентифицируемых по целочисленному идентификатору.

Каждое сообщение должно выглядеть так:

<int id><float array data>

На стороне получателя он точно знает, сколько существует массивов, и, таким образом, устанавливает точное количество recv. Получив сообщение, он анализирует идентификатор и помещает данные в нужное место. Проблема в том, что сообщение может быть отправлено из любых других процессов принимающему процессу. (например, производители имеют структуру очереди работ и обрабатывают любой идентификатор, доступный в очереди.)

Поскольку MPI гарантирует только P2P при доставке заказа, я не могу просто поместить целочисленный идентификатор и данные FP в два сообщения, иначе получатель не сможет сопоставить идентификатор с данными. MPI также не позволяет передавать два типа данных за одну отправку.

Я могу думать только о двух подходах.

1) Получатель имеет массив размера m (источник [m]), где m - количество отправляющих узлов. Отправитель сначала отправляет идентификатор, затем данные. Получатель сохраняет идентификатор в источник [i] после получения целочисленного сообщения от отправителя i. Получив массив FP от отправителя i, он проверяет источник [i], получает идентификатор и перемещает данные в нужное место. Это работает, потому что MPI гарантирует упорядоченную связь P2P. Это требует, чтобы получатель сохранял информацию о состоянии каждого отправителя. Что еще хуже, если один процесс отправки может иметь два идентификатора, отправленных перед данными (например, многопоточность), этот механизм не будет работать.

2) Обработайте id и FP как байты и скопируйте их в буфер отправки. Отправьте их как MPI_CHAR, а получатель вернет их в целое число и массив FP. Затем мне нужно оплатить дополнительную стоимость копирования вещей в байтовый буфер на стороне отправителя. Общий временный буфер также растет по мере увеличения числа потоков в процессе MPI.

Ни один из них не является идеальным решением. Я не хочу ничего блокировать внутри процесса. Интересно, есть ли у кого-нибудь из вас лучшие предложения.

Изменить: код будет запущен в общем кластере с infiniband. Машины будут распределены случайным образом. Поэтому я не думаю, что сокеты TCP смогут мне здесь помочь. К тому же IPoIB выглядит дорого. Мне нужна полная скорость 40 Гбит / с для связи, и процессор должен выполнять вычисления.


person wujj123456    schedule 03.10.2012    source источник


Ответы (2)


Как кто-то уже писал, вы можете использовать MPI_ANY_SOURCE для получения из любого источника. Чтобы отправить два разных типа данных за одну отправку, вы можете использовать производный тип данных:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define asize 10

typedef struct data_ {
  int   id;
  float array[asize];
} data;

int main() {

  MPI_Init(NULL,NULL);

  int rank = -1;
  int size = -1;
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);

  data buffer;    
 // Define and commit a new datatype
  int          blocklength [2];
  MPI_Aint     displacement[2];
  MPI_Datatype datatypes   [2];
  MPI_Datatype mpi_tdata;

  MPI_Aint     startid,startarray;
  MPI_Get_address(&(buffer.id),&startid);
  MPI_Get_address(&(buffer.array[0]),&startarray);

  blocklength [0] = 1;
  blocklength [1] = asize;
  displacement[0] = 0;
  displacement[1] = startarray - startid;
  datatypes   [0] = MPI_INT;
  datatypes   [1] = MPI_FLOAT;

  MPI_Type_create_struct(2,blocklength,displacement,datatypes,&mpi_tdata);
  MPI_Type_commit(&mpi_tdata);

  if (rank == 0) {
    int        count = 0;
    MPI_Status status;

    while (count < size-1 ) {
      // Non-blocking receive
      printf("Receiving message %d\n",count);
      MPI_Recv(&buffer,1,mpi_tdata,MPI_ANY_SOURCE,0,MPI_COMM_WORLD,&status);
      printf("Message tag %d, first entry %g\n",buffer.id,buffer.array[0]);
      // Counting the received messages 
      count++;
    }

  } else {
    // Initialize buffer to be sent
    buffer.id = rank;
    for (int ii = 0; ii < size; ii++) {
      buffer.array[ii] = 10*rank + ii;
    }
    // Send buffer
    MPI_Send(&buffer,1,mpi_tdata,0,0,MPI_COMM_WORLD);
  }

  MPI_Type_free(&mpi_tdata);

  MPI_Finalize();
  return 0;
}
person Massimiliano    schedule 03.10.2012
comment
Я согласен с тем, что использование MPI_ANY_SOURCE и тега должно работать, но оно не ведет себя так, как указано в моем коде. На этот раз я попробую использовать свой собственный тип данных. Спасибо! - person wujj123456; 04.10.2012
comment
То, что вы предлагаете, крайне непереносимо и не будет работать в гетерогенных средах. Переносимым решением было бы зарегистрировать тип структуры MPI с двумя полями, одно из которых имеет тип MPI_INT и длину блока 1, а второе - тип MPI_FLOAT и длину блока asize. - person Hristo Iliev; 04.10.2012

Вы можете указать MPI_ANY_SOURCE в качестве ранга источника в функции приема, а затем отсортировать сообщения по их тегам, что проще, чем создание собственных сообщений. Вот упрощенный пример:

#include <stdio.h>
#include "mpi.h"

int main() {
    MPI_Init(NULL,NULL);
    int rank=0;
    int size=1;
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    MPI_Comm_size(MPI_COMM_WORLD,&size);

    // Receiver is the last node for simplicity in the arrays
    if (rank == size-1) {
        // Receiver has size-1 slots
        float data[size-1];
        MPI_Request request[size-1];

        // Use tags to sort receives
        for (int tag=0;tag<size-1;++tag){
            printf("Receiver for id %d\n",tag);
            // Non-blocking receive
            MPI_Irecv(data+tag,1,MPI_FLOAT,
                      MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&request[tag]);
        }

        // Wait for all requests to complete
        printf("Waiting...\n");
        MPI_Waitall(size-1,request,MPI_STATUSES_IGNORE);
        for (size_t i=0;i<size-1;++i){
            printf("%f\n",data[i]);
        }
    } else {
        // Producer
        int id = rank;
        float data = rank;
        printf("Sending {%d}{%f}\n",id,data);
        MPI_Send(&data,1,MPI_FLOAT,size-1,id,MPI_COMM_WORLD);
    }

    return MPI_Finalize();
}
person Scott Wales    schedule 03.10.2012
comment
Фактически, это текущая реализация, которая у меня есть, с той лишь разницей, что я использую Isend для производителей, а общий объем данных составляет порядка ГБ, который доставляется примерно за 3000 отправок. Как ни странно, возникает взаимоблокировка, если идентификаторы отправки действительно не в порядке, когда некоторые отправки не могут продолжаться. Если я синхронизирую отправителей для отправки в порядке установки тегов у получателя, тупик исчезнет. Из документа в любом случае никогда не должно происходить взаимоблокировки, поскольку для каждого тега существует только один соответствующий send / recv. Вот почему я хочу собрать сообщения вместе и попробовать реализации ANY_SOURCE и ANY_TAG. - person wujj123456; 04.10.2012
comment
Я уверен, что тупик возникает из-за MPI send / recv, потому что в коде нет другого механизма синхронизации. Isend / irecv не занимают внутреннее буферное пространство. Теги отправителя и получателя идеально совпадают. Фактически, тупик проявляется только в том случае, если отправители отправляют сообщения в серьезном нарушении порядка. У меня недостаточно знаний о реализации openMPI, чтобы рассуждать об этом поведении, которое противоречит спецификации MPI. - person wujj123456; 04.10.2012
comment
Обратите внимание, что стандарт MPI требует, чтобы допустимые значения тегов были от 0 до MPI_TAG_UB, при этом MPI_TAG_UB было не менее 32767. Некоторые реализации предоставляют более высокие значения для MPI_TAG_UB (например, 2 ^ 31-1), но другие этого не делают. . Если используется более 32768 идентификаторов массивов, код, использующий теги для идентификации массивов, не будет переносимым. - person Hristo Iliev; 04.10.2012