Оптимизация одномерного уравнения теплопроводности с использованием SIMD

Я использую код CFD (для вычислительной гидродинамики). Недавно у меня была возможность увидеть, как компилятор Intel использует SSE в одном из моих циклов, увеличивая производительность вычислений в этом цикле почти в 2 раза. Однако использование инструкций SSE и SIMD больше похоже на удачу. В большинстве случаев компилятор ничего не делает.

Затем я пытаюсь заставить использовать SSE, учитывая, что инструкции AVX усилят этот аспект в ближайшем будущем.

Я сделал простой одномерный код теплопередачи. Он состоит из двух фаз, использующих результаты другой (U0 -> U1, затем U1 -> U0, затем U0 -> U1 и т. д.). Когда он повторяется, он сходится к устойчивому решению. Большинство моих циклов в основном коде используют один и тот же тип вычислений. (конечная разница).

Однако мой код в два раза медленнее обычного цикла. Результаты одинаковы, поэтому расчеты согласуются.

Я сделал ошибку? Я использую Core 2 для проверки цикла перед тестированием на суперкомпьютере (используя Westmer).

Вот код с циклом SSE, а затем с эталонным циклом:

#include <stdio.h>
#include <emmintrin.h>
#include <time.h>
//#include <vector>
#define n1 1004
#define niter 200000

int i,j,t;

double U0[n1] __attribute__ ((aligned(16)));
double U1[n1] __attribute__ ((aligned(16)));
double Dx,Dy,Lx,Ly,InvDxDx,Dt,alpha,totaltime,Stab,DtAlpha,DxDx;

__m128d vmmx00;
__m128d vmmx01;
__m128d vmmx02;
__m128d vmmx10;

__m128d va;
__m128d vb;
__m128d vc;
__m128d vd;

clock_t time0,time1;
FILE *f1;
int main()
{
/* ---- GENERAL ---- */
   alpha = 0.4;
   totaltime = 1.0/100.0;
   Dt = totaltime/((niter-1)*1.0);
   Lx = 1.0;
   Dx = Lx/((n1-1)*1.0);
   InvDxDx = 1.0/(Dx*Dx);
   DxDx = Dx*Dx;
   Stab = alpha*Dt*(InvDxDx);
   DtAlpha = Dt*alpha;
/* Stability if result <= 0.5 */
   printf("Stability factor : %f \n",Stab);


   for( i = 0; i < n1; i++){U0[i] = 0.0;}
   U0[1] = 1.0;
   U0[2] = 1.0;
   U0[3] = 1.0;
   U0[n1-2] = 2.0;

//    for ( i = 0; i < n1; i++) {
 //       for ( j = i + 1; j < n2; j++) {
  //          std::swap(U0[i][j], U0[j][i]);
   //     }
    //}

    va = _mm_set1_pd(-2.0);
    vb = _mm_set1_pd(InvDxDx);
    vd = _mm_set1_pd(DtAlpha);

 time0=clock();

 for( t = 0; t < niter; t++)
 {

    for( i = 2; i < n1-2; i+=2)
    {
      //printf("%d  %d  \n",i,j);
      //fflush(stdout);
      vmmx00 = _mm_load_pd(&U0[i]);
      vmmx01 = _mm_loadu_pd(&U0[i+1]);
      vmmx02 = _mm_loadu_pd(&U0[i-1]);

      vmmx10 = _mm_mul_pd(va,vmmx00);   // U1[i][j] = -2.0*U0[i][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx01);   // U1[i][j] = U1[i][j] + U0[i+1][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx02);   // U1[i][j] = U1[i][j] + U0[i-1][j];
      vmmx10 = _mm_mul_pd(vb,vmmx10);   // U1[i][j] = U1[i][j] * InvDxDx;

      vmmx10 = _mm_mul_pd(vd,vmmx10);   // U1[i][j] = U1[i][j] * DtAlpha;
      vmmx10 = _mm_add_pd(vmmx10,vmmx00);   // U1[i][j] = U1[i][j] + U0[i][j];

      _mm_store_pd(&U1[i],vmmx10);

      // U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
    }

    for( i = 2; i < n1-2; i+=2)
    {
      //printf("%d  %d  \n",i,j);
      //fflush(stdout);
      vmmx00 = _mm_load_pd(&U1[i]);
      vmmx01 = _mm_loadu_pd(&U1[i+1]);
      vmmx02 = _mm_loadu_pd(&U1[i-1]);

      vmmx10 = _mm_mul_pd(va,vmmx00);   // U0[i][j] = -2.0*U1[i][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx01);   // U0[i][j] = U0[i][j] + U1[i+1][j];
      vmmx10 = _mm_add_pd(vmmx10,vmmx02);   // U0[i][j] = U0[i][j] + U1[i-1][j];
      vmmx10 = _mm_mul_pd(vb,vmmx10);   // U0[i][j] = U0[i][j] * InvDxDx;

      vmmx10 = _mm_mul_pd(vd,vmmx10);   // U0[i][j] = U0[i][j] * DtAlpha;
      vmmx10 = _mm_add_pd(vmmx10,vmmx00);   // U0[i][j] = U0[i][j] + U1[i][j];

      _mm_store_pd(&U0[i],vmmx10);

      // U1[i][j] = U0[i][j] + DtAlpha*( (U0[i+1][j]-2.0*U0[i][j]+U0[i-1][j])*InvDxDx
    }

 }

 time1=clock();
 printf("Loop 0, total time : %f \n", (double) time1-time0);
 f1 = fopen ("out0.dat", "wt");
 for( i = 1; i < n1-1; i++)
 {
       fprintf (f1, "%d\t%f\n", i, U0[i]);
 }


// REF

   for( i = 0; i < n1; i++){U0[i] = 0.0;}
   U0[1] = 1.0;
   U0[2] = 1.0;
   U0[3] = 1.0;
   U0[n1-2] = 2.0;

 time0=clock();

 for( t = 0; t < niter; t++)
 {

    for( i = 2; i < n1-2; i++)
    {
       U1[i] = U0[i] + DtAlpha* (U0[i+1]-2.0*U0[i]+U0[i-1])*InvDxDx;
    }

    for( i = 2; i < n1-2; i++)
    {
       U0[i] = U1[i] + DtAlpha* (U1[i+1]-2.0*U1[i]+U1[i-1])*InvDxDx;
    }

 }

 time1=clock();
 printf("Loop 0, total time : %f \n", (double) time1-time0);
 f1 = fopen ("outref.dat", "wt");
 for( i = 1; i < n1-1; i++)
 {
       fprintf (f1, "%d\t%f\n", i, U0[i]);
 }



}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Изменить:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Учитывая ваши ответы, я нашел подходящее место для обсуждения этого, поэтому я расширим тему и объясню свои цели. Если вы согласны, мы обсудим все циклы один за другим. Это может быть долго, но это может быть чрезвычайно полезно для многих людей в моей области, а также для решателей OpenSource, таких как OpenFoam. Без учета влияния на энергопотребление (мы все пользуемся большими суперкалькуляторами).

Код CFD, который я использую, работает более 1 месяца на 512 ядрах Westmer. Я использую MPI (интерфейс передачи сообщений) для связи между процессами. Поле Physic можно рассматривать как сетку, то есть массивы 1D, 2D или 3D, в зависимости от типа моделирования. Но 3D — это лучшее, что вы можете себе представить.

Полный код находится на Fortran 95, который на самом деле является упрощенным C. Его легко связать с C, а подпрограммы C можно вызывать прямо из Fortran, типы те же (int, double, long и т. д.). Но Фортран не допускает таких оптимизаций: он разработан, чтобы быть простым. Вот почему я исследую инструкции C.

Во всех кодах CFD мы сталкиваемся с одними и теми же проблемами: 3 типа циклов и распределение памяти MPI. Давайте сначала обсудим циклы:

  • Пространственные производные (называемые конечной разностью). Цикл состоит из одномерной свертки для всех случаев (одномерные, двухмерные, трехмерные, вы производите производные только по одной оси за раз) (DF = F[i-1 ]*A + F[i]*B + F[i+1]*C). Однако при использовании более 1D доступ к памяти становится следующим:

    // x1 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C
    
    // x2 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
    

    В первом цикле обращение к памяти не продолжается (инверсия в Фортране, инвертируется память). Это первая проблема. То же самое при использовании 3D-массивов.

  • Разрешение уравнения Пуассона, т. е. умножение матриц. Цикл состоит из одномерной, двухмерной или трехмерной свертки, в зависимости от моделирования. На самом деле это вторая производная (DDF = D(DF)).

    for i 1 -> n1
        for j 1 ->n2
            DDF[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C + F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G
    

    Этот цикл такой же, как цикл, который я дал вам сначала, но он вычисляется напрямую, а не четный и нечетный.

  • Взвешенное разрешение Гаусса-Зейделя, то есть тот же цикл, что и ниже, но с зависимостью:

    // even
    for i 1 -> n1
        for j 1 ->n2
            F1[i][j] = F0[i-1][j]*A + F0[i][j]*B + F0[i+1][j]*C + F0[i][j-1]*D + F0[i][j]*E + F0[i][j+1]*G
    
    //odd
    for i 1 -> n1
        for j 1 ->n2
            F0[i][j] = F1[i-1][j]*A + F1[i][j]*B + F1[i+1][j]*C + F1[i][j-1]*D + F1[i][j]*E + F1[i][j+1]*G
    

    Это петля, которую вы исследовали ранее.

Затем мы сталкиваемся с другой проблемой: распределением памяти. Каждое ядро ​​имеет свою память, и нужно делиться ею с другими. Рассмотрим последний цикл, но упрощенно:

    for t 1 -> niter

        // even
        for i 1 -> n1-2
                F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

        //odd
        for i 1 -> n1-2
                F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

Учтите, что n1=512, но это не может быть сохранено в локальной памяти из-за малого объема оперативной памяти. Память распределяется по ядрам core0 (1->255) и core1 (256-512), которые находятся НЕ на одном компьютере, а в сети. В этом случае производную по i=256 нужно знать по точке i=255, но это значение находится на другом проце. Память, содержащая значения других процессоров, называется памятью GHOST. Итак, петля:

    ! update boundary memory :
    Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1 (don't forget that for core1, the array restart from 0)
    Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0 (you understand that F0[256] is the ghost for core0, and F0[0] is the ghost for core1)
    // even, each core do this loop.
    for i 1 -> n1-2
            F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

    ! update boundary memory :
    Share to ghost : core0 : F1[255] -> Network -> F1[0] : core1
    Share to ghost : core1 : F1[1] -> Network -> F1[256] : core0

    //odd, each core do this loop.
    for i 1 -> n1-2
            F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

Нам нужно разобраться с этим. Mysticial, теперь вы видите, к чему я клоню: переплетение циклов должно учитывать это. Я думаю, что это можно сделать так:

        ! update boundary memory :
        Share to ghost : core0 : F0[255] -> Network -> F0[0] : core1 
        Share to ghost : core1 : F0[1] -> Network -> F0[256] : core0

        for t 1 -> niter

        ! compute borders in advance :
        core0 only : F1[255] = F0[254]*A + F0[255]*B + F0[256]*C
        core1 only : F1[1] = F0[0]*A + F0[1]*B + F0[2]*C

        Launch Share to ghost asynchronous : core0 : F1[255] -> Network -> F1[0] : core1 
        Launch Share to ghost asynchronous : core1 : F1[1] -> Network -> F1[256] : core0

        During the same time (this can be done at the same time because MPI support asynchronous communications)
        // even
        for i 2 -> n1-3 (note the reduced domain)
                F1[i] = F0[i-1]*A + F0[i]*B + F0[i+1]*C

        Check that communications are done.


        ! compute borders in advance :
        core0 only : F0[255] = F1[254]*A + F1[255]*B + F1[256]*C
        core1 only : F0[1] = F1[0]*A + F1[1]*B + F1[2]*C

        Launch Share to ghost asynchronous : core0 : F0[255] -> Network -> F0[0] : core1 
        Launch Share to ghost asynchronous : core1 : F0[1] -> Network -> F0[256] : core0

        //odd, each core do this loop.
        for i 2 -> n1-3
                F0[i] = F1[i-1]*A + F1[i]*B + F1[i+1]*C

        Check that communications are done.

Надеюсь, я не ошибся где-то с индексами. Давайте рассмотрим первый тип циклов на данный момент, которые являются самыми простыми, и мы можем пройти циклы 2 и 3 после, которые похожи. Цель состоит в том, чтобы сделать это (что похоже на обработку изображений):

    // x1 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x1[i][j] = F[i-1][j]*A + F[i][j]*B + F[i+1][j]*C

    // x2 derivative
    for i 1 -> n1
        for j 1 ->n2
            DF_x2[i][j] = F[i][j-1]*D + F[i][j]*E + F[i][j+1]*G

Я над этим работаю, и через несколько часов выложу код результата с учетом ваших рекомендаций.


person Lags Moomba    schedule 20.11.2012    source источник
comment
Пара вещей: у вас есть довольно неприятные цепочки зависимостей. Во-вторых, известно, что обращения к памяти с неверным выравниванием на Core 2 довольно опасны (особенно если они пересекают строку кэша). Я рекомендую развернуть оба внутренних цикла и чередовать их итерации. Затем несоответствие может быть устранено путем перетасовки.   -  person Mysticial    schedule 20.11.2012
comment
Да, это неоптимизированный цикл. Я нашел это о смещении: lemire.me/blog/archives/2012/05/31/ Я думал, что это не так важно. Я планирую запустить следующий код на процессорах I7 Westmer. Но для соблюдения переносимости я предпочитаю избегать loadu. Учитывая петли, я отредактирую свой первый пост, чтобы добавить комментарий. Тасовка, ну мой родной язык Фортран 95, так что я к такому не привык. Я расследую.   -  person Lags Moomba    schedule 21.11.2012


Ответы (1)


Мудрые слова Mysticial в коде:

  xmm7 = InvDxDx * dtAlpha; // precalculate
  xmm6 = -2*xmm7;
  xmm0 = *ptr++;   // has values d0, d1
  xmm1 = *ptr++;   // has values d2, d3
  while (loop--) {
     xmm2  = xmm0;  // take a copy of d0,d1
     xmm0 += xmm1;  // d0+d2, d1+d3
     xmm2 = shufps (xmm2,xmm1, 0x47);  // the middle elements d1,d2 ??
     xmm0 *= xmm7;  // sum of outer elements * factor
     xmm2 *= xmm6;  // -2 * center element * factor
     // here's still a nasty dependency
     xmm2 += xmm0;
     xmm0 = xmm1;    // shift registers
     *out_ptr ++= xmm2;  // flush
     xmm1 = *ptr++;  // read in new values

  }

Это можно улучшить, разделив цикл на 2 сегмента, каждый из которых обрабатывает 502 записи отдельно или от разных задач, и чередуя результаты, что разрывает цепочку зависимостей.

Кроме того, подхода со сдвиговым регистром xmm0 ‹- xmm1, xmm1 ‹- new можно избежать, дважды разворачивая цикл и меняя значения xmm0 и xmm1 в каждом втором случае.

Это указывает на еще одну большую проблему: при использовании встроенных функций регистры постоянно сбрасываются в память.

person Aki Suihkonen    schedule 20.11.2012
comment
Во-первых, спасибо за этот образец. Если я вас хорошо понимаю (я не привык к C): xmm? все __m128d. *ptr++ должен перейти в память. Но где вы указали отправную точку? (и как?) Я теперь понимаю shufps, но как вы определили 0x47? Я просто хочу понять это, а не просто использовать его. Чтобы закончить, мне трудно понять *out_ptr ++= xmm2; Flush означает обновить память, но как эта команда это делает? - person Lags Moomba; 21.11.2012
comment
эти xmmN предпочтительно являются настоящими встроенными инструкциями ассемблера, но подойдут и встроенные. Вы должны отладить/проверить непосредственный 0x47. Это может занять несколько минут, например. из songho.ca/misc/sse/sse.html Моя терминология относится к регистры сдвига или цифровые фильтры, где входящий поток проходит через набор регистров (память -> xmm1 -> xmm0 -> xmm2 -> память). Есть псевдооперация чтения памяти *xmm1=*ptr++, и она сбрасывается/записывается псевдокодом *out_ptr++= xmm2; псевдо, потому что, вероятно, придется использовать __mm_load. - person Aki Suihkonen; 21.11.2012