【MPI学习2】MPI并行程序设计模式:对等模式 & 主从模式

2023-07-30,,

这里的内容主要是都志辉老师《高性能计算之并行编程技术——MPI并行程序设计》

书上有一些代码是FORTAN的,我在学习的过程中,将其都转换成C的代码,便于统一记录。

这章内容分为两个部分:MPI对等模式程序例子 & MPI主从模式程序例子

1. 对等模式MPI程序设计

1.1 问题背景

这部分以Jacobi迭代为具体问题,列举了三个求解Jacobi迭代问题的MPI对等模式程序。

这里需要阐明一下,书上的Jacobi迭代具体的背景可以参考这个内容:http://www.mcs.anl.gov/research/projects/mpi/tutorial/mpiexmpl/src/jacobi/C/main.html

简答说,就是矩阵每个元素都是其上下左右四个元素的平均值,矩阵四个边界的值不变。

这里学习的重点暂时放在如何完成上述矩阵的迭代的功能实现,不应该偏离过多去纠结Jacobi迭代,提高专注度。

1.2 第一版最原始的Jacobi迭代对等程序

代码如下:

 #include "mpi.h"
#include <stdio.h>
#include <stdlib.h> #define N 8
#define SIZE N/4
#define T 2 void print_myRows(int, float [][N]); int main(int argc, char *argv[])
{
float myRows[SIZE+][N], myRows2[SIZE+][N];
int myid;
MPI_Status status; MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid); int i,j;
/*初始化*/
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
{
myRows[i][j] = myRows2[i][j] = ;
}
}
if ( == myid) {
for ( j = ; j<N; j++)
myRows[][j] = 8.0;
}
if ( == myid) {
for ( j=; j<N; j++)
myRows[SIZE][j] = 8.0;
}
for ( i = ; i<SIZE+; i++)
{
myRows[i][] = 8.0;
myRows[i][N-] = 8.0;
}
/*Jacobi Iteration部分*/
int step;
for ( step = ; step < T; step++ )
{
// 传递数据
if (myid<) {
// 从 下方 进程接收数据
MPI_Recv(&myRows[SIZE+][], N, MPI_FLOAT, myid+, , MPI_COMM_WORLD, &status);
}
if (myid>) {
// 向 上方 进程发送数据
MPI_Send(&myRows[][], N, MPI_FLOAT, myid-, , MPI_COMM_WORLD);
}
if (myid<) {
// 向 下方 进程发送数据
MPI_Send(&myRows[SIZE][], N, MPI_FLOAT, myid+, , MPI_COMM_WORLD);
}
if (myid>) {
// 从 上方 进程接收数据
MPI_Recv(&myRows[][], N, MPI_FLOAT, myid-, , MPI_COMM_WORLD, &status);
}
// 计算
int r_begin, r_end;
r_begin = (==myid) ? : ;
r_end = (==myid) ? SIZE- : SIZE;
for ( i = r_begin; i<=r_end; i++)
{
for ( j = ; j<N-; j++)
myRows2[i][j] = 0.25*(myRows[i][j-]+myRows[i][j+]+myRows[i-][j]+myRows[i+][j]);
}
// 更新
for ( i = r_begin; i<=r_end; i++)
{
for (j = ; j<N-; j++)
myRows[i][j] = myRows2[i][j];
}
}
// MPI_Barrier(MPI_COMM_WORLD);
print_myRows(myid, myRows);
MPI_Finalize();
} void print_myRows(int myid, float myRows[][N])
{
int i,j;
int buf[];
MPI_Status status;
buf[] = ;
if ( myid> ) {
MPI_Recv(buf, , MPI_INT, myid-, , MPI_COMM_WORLD, &status);
}
printf("Result in process %d:\n", myid);
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
printf("%1.3f\t", myRows[i][j]);
printf("\n");
}
if ( myid< ) {
MPI_Send(buf, , MPI_INT, myid+, , MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
}

代码执行结果:

代码分析:

(1)矩阵分块方式

  先说明一下,这与原书上的代码设计思路有差别:都老师的书上是把矩阵按columns划分的,我上面这份代码是将矩阵按照row分块的。

  书上为什么要按照列进行矩阵分块呢?我觉得原因是FORTAN的二维数组是按照列来优先存储元素的,所以按列划分矩阵,对于后面的MPI_Send MPI_Recv操作都比较方便。

  而我使用的是C语言来实现,C语言的二维数组是优先按照行来存储的,所以按照行来划分矩阵更划算,对于后面的MPI_Recv和MPI_Send操作更方便。

  ***关于矩阵分块的方式与矩阵乘法***

  有关这个部分,我查阅了一本《高性能并行计算》的讲义:http://www.sccas.cn/yhfw/wdypx/wd/lszl/201112/W020111215333260773702.pdf

  参阅了这本讲义的第3.1节内容,矩阵A×矩阵B并行计算的四种矩阵划分方式:行行、行列、列行、列列

  这几种方法的核心就是:固定A或B,移动一个;或都固定,移动中间结果。

  如果忘记了算法是如何移动数据的,通过画图的方式可以帮助理解:在每个时间片内,画出各个处理机有哪些数据,每个处理机内部进行了哪些运算。

  另外,这些算法的效率比较主要考察“数据交换量”和“计算量”两个指标。

(2)代码设计逻辑

  这个算法并不复杂,比较重要的是如何在对等模式下,各个进程要互相发送和接受数据。如何设计通信次序才能保证进程之间没有死锁出现呢?

  关于这个问题,可以回顾一下都老师这本书上“7.7编写安全的MPI程序”。

  我的理解就是:如果进程A和进程B需要互相发送和接收数据,画出进程之间交换数据的调用次序图,如果次序图中没有两个箭头是交叉的,那么就认为不会出现通信死锁

  上面这个程序,通信调用次序图如下:

    

  可以看到,上述的通信调用次序图中,没有两个箭头是交叉的,所以是安全的。

  同时,也可以看到上面的程序关于进程通信的部分跟计算部分是完全分开的:

  通信部分只需要传递少量边界数据,计算部分充分利用各个处理器的并行计算优势。

1.3 捆绑发送接受的Jacobi迭代实现

代码实现:

 #include "mpi.h"
#include <stdio.h>
#include <stdlib.h> #define N 8
#define SIZE N/4
#define T 2 void print_myRows(int, float [][N]); int main(int argc, char *argv[])
{
float myRows[SIZE+][N], myRows2[SIZE+][N];
int myid;
MPI_Status status; MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid); int i,j;
/*初始化*/
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
{
myRows[i][j] = myRows2[i][j] = ;
}
}
if ( == myid) {
for ( j = ; j<N; j++)
myRows[][j] = 8.0;
}
if ( == myid) {
for ( j=; j<N; j++)
myRows[SIZE][j] = 8.0;
}
for ( i = ; i<SIZE+; i++)
{
myRows[i][] = 8.0;
myRows[i][N-] = 8.0;
}
/*Jacobi Iteration部分*/
int step;
for ( step = ; step < T; step++ )
{
// 从上往下平移数据
if ( == myid) {
MPI_Send(&myRows[SIZE][], N, MPI_FLOAT, myid+, , MPI_COMM_WORLD);
}
else if ( == myid) {
MPI_Recv(&myRows[][], N, MPI_FLOAT, myid-, , MPI_COMM_WORLD, &status);
}
else {
MPI_Sendrecv(&myRows[SIZE][], N, MPI_FLOAT, myid+, , &myRows[][], N, MPI_FLOAT, myid-, , MPI_COMM_WORLD, &status);
}
// 从下向上平移数据
if ( == myid) {
MPI_Send(&myRows[][], N, MPI_FLOAT, myid-, , MPI_COMM_WORLD);
}
else if ( == myid) {
MPI_Recv(&myRows[SIZE+][], N, MPI_FLOAT, myid+, , MPI_COMM_WORLD, &status);
}
else {
MPI_Sendrecv(&myRows[][], N, MPI_FLOAT, myid-, , &myRows[SIZE+][], N, MPI_FLOAT, myid+, , MPI_COMM_WORLD, &status);
}
// 计算
int r_begin, r_end;
r_begin = (==myid) ? : ;
r_end = (==myid) ? SIZE- : SIZE;
for ( i = r_begin; i<=r_end; i++)
{
for ( j = ; j<N-; j++)
myRows2[i][j] = 0.25*(myRows[i][j-]+myRows[i][j+]+myRows[i-][j]+myRows[i+][j]);
}
// 更新
for ( i = r_begin; i<=r_end; i++)
{
for (j = ; j<N-; j++)
myRows[i][j] = myRows2[i][j];
}
}
MPI_Barrier(MPI_COMM_WORLD);
print_myRows(myid, myRows);
MPI_Finalize();
} void print_myRows(int myid, float myRows[][N])
{
int i,j;
int buf[];
MPI_Status status;
buf[] = ;
if ( myid> ) {
MPI_Recv(buf, , MPI_INT, myid-, , MPI_COMM_WORLD, &status);
}
printf("Result in process %d:\n", myid);
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
printf("%1.3f\t", myRows[i][j]);
printf("\n");
}
if ( myid< ) {
MPI_Send(buf, , MPI_INT, myid+, , MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
}

代码分析:

如果一个进程既发送数据又接收数据,则可以使用MPI_Sendrecv函数来实现。

比如Jacobi迭代中的中间两个进程,都需要发送和接收。这样处理起来就可以用MPI_Sendrecv函数。

上述代码量并没有减少,但是设计逻辑稍微直观了一些。

1.4 引入虚拟进程的Jacobi迭代

代码实现:

 #include "mpi.h"
#include <stdio.h>
#include <stdlib.h> #define N 8
#define SIZE N/4
#define T 2 void print_myRows(int, float [][N]); int main(int argc, char *argv[])
{
float myRows[SIZE+][N], myRows2[SIZE+][N];
int myid;
MPI_Status status; MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid); int i,j;
/*初始化*/
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
{
myRows[i][j] = myRows2[i][j] = ;
}
}
if ( == myid) {
for ( j = ; j<N; j++)
myRows[][j] = 8.0;
}
if ( == myid) {
for ( j=; j<N; j++)
myRows[SIZE][j] = 8.0;
}
for ( i = ; i<SIZE+; i++)
{
myRows[i][] = 8.0;
myRows[i][N-] = 8.0;
}
/*Jacobi Iteration部分*/
int upid, downid;
upid = (==myid) ? MPI_PROC_NULL : myid-;
downid = (==myid) ? MPI_PROC_NULL : myid+;
int step;
for ( step = ; step < T; step++ )
{
/*从上向下平移数据*/
MPI_Sendrecv(&myRows[SIZE][], N, MPI_FLOAT, downid, , &myRows[][], N, MPI_FLOAT, upid, , MPI_COMM_WORLD, &status);
/*从下往上发送数据*/
MPI_Sendrecv(&myRows[][], N, MPI_FLOAT, upid, , &myRows[SIZE+][], N, MPI_FLOAT, downid, , MPI_COMM_WORLD, &status);
// 计算
int r_begin, r_end;
r_begin = (==myid) ? : ;
r_end = (==myid) ? SIZE- : SIZE;
for ( i = r_begin; i<=r_end; i++)
{
for ( j = ; j<N-; j++)
myRows2[i][j] = 0.25*(myRows[i][j-]+myRows[i][j+]+myRows[i-][j]+myRows[i+][j]);
}
// 更新
for ( i = r_begin; i<=r_end; i++)
{
for (j = ; j<N-; j++)
myRows[i][j] = myRows2[i][j];
}
}
MPI_Barrier(MPI_COMM_WORLD);
print_myRows(myid, myRows);
MPI_Finalize();
} void print_myRows(int myid, float myRows[][N])
{
int i,j;
int buf[];
MPI_Status status;
buf[] = ;
if ( myid> ) {
MPI_Recv(buf, , MPI_INT, myid-, , MPI_COMM_WORLD, &status);
}
printf("Result in process %d:\n", myid);
for ( i = ; i<SIZE+; i++)
{
for ( j = ; j<N; j++)
printf("%1.3f\t", myRows[i][j]);
printf("\n");
}
if ( myid< ) {
MPI_Send(buf, , MPI_INT, myid+, , MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
}

代码分析:

Jacobi迭代的麻烦之处在于(按行划分矩阵):最上面的矩阵只需要与它下面的矩阵所在进程互相通信一次,最下面的矩阵只需要与它上面的矩阵所在进程互相通信一次,而中间的其余矩阵都需要与其上下相邻的矩阵所在进程通信两次。

因此,必须对最上面和最下面的矩阵特殊处理,作为一种corner case来对待,所以代码麻烦。

这里引入了MPI_PROC_NULL虚拟进程的概念,相当于给最上面的矩阵之上再来一个想象存在的进程:与这个虚拟进程通信不会真的通信,而是立刻返回。

这个虚拟进程的意义在于可以方便处理corner case,如上面的例子,无论是代码量和设计思路都简化了许多,MPI替我们完成了很多工作。

2. 主从模式MPI程序设计

2.1 矩阵A×向量B=向量C

主要通过矩阵A×向量B来讲解主从模式的程序设计,这个程序也是从FORTAN翻译成C代码的。

大体思路如下:

(1)一个master进程,负责总体调度,广播向量B到slaver进程,并把矩阵A的每一行发送到某个slaver进程

(2)slaver一开始从master的广播中获得向量B,之后每次从master获得矩阵A的某一行(具体的行号,利用MPI_TAG发送;但是为了要把0行空出来作为结束标志,所以从1开始),计算矩阵A的该行与向量B的内积后再回传给master进程

(3)master进程每次从一个slaver进程获得向量C的某个元素的值,master进程通过MPI_Recv中的status.MPI_TAG来判断该计算结果该更新到向量C的哪个位置中

(4)如果master进程从某个slaver回收计算结果后,没有新的计算任务要派送给slaver进程了,就向slaver进程发送一个MPI_TAG=0的消息;slaver收到MPI_TAG=0的消息,就结束退出

具体代码实现如下:

 #include "mpi.h"
#include <stdio.h> #define ROWS 100
#define COLS 100
#define min(x,y) ((x)>(y)?(y):(x))
int main(int argc, char *argv[])
{
int rows = , cols = ;
int master = ;
int myid, numprocs;
int i,j;
float a[ROWS][COLS], b[COLS], c[COLS];
float row_result;
MPI_Status status; MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs); /*master进程*/
if (master == myid) {
/*初始化矩阵a和b*/
for (j=; j<cols; j++) b[j]=;
for (i=; i<rows; i++)
{
for (j=; j<cols; j++)
{
a[i][j] = i;
}
}
/*只在master进程中初始化b 其余slave进程通过被广播的形式获得向量b*/
MPI_Bcast(&b[], cols, MPI_FLOAT, master, MPI_COMM_WORLD);
/*向各个slave进程发送矩阵a的各行*/
int numsent = ;
for ( i=; i<min(numprocs,rows+); i++)
{
/* 每个slave进程计算一行×一列的结果 这里用MPI_TAG参数标示对应结果向量c的下标+1
* MPI_TAG在这里的开始取值范围是1 要把MPI_TAG空出来 作为结束slave进程的标志*/
MPI_Send(&a[i-][], cols, MPI_FLOAT, i, ++numsent, MPI_COMM_WORLD);
}
/*master进程接受其他各进程的计算结果*/
for ( i=; i<rows; i++)
{
/*类似poll的方法 只要有某个slave进程算出来结果了 MPI_Recv就能返回执行一次*/
MPI_Recv(&row_result, , MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
/*这里MPI_TAG的范围是1到rows 注意存储结果时下标减1*/
c[status.MPI_TAG-] = row_result;
/*发送矩阵a中没发送完的行 就用刚返回计算结果空出来的那个slave进程 通过status.MPI_SOURCE找到这个空出来的进程*/
if (numsent < rows) {
MPI_Send(&a[numsent][], cols, MPI_FLOAT, status.MPI_SOURCE, numsent+, MPI_COMM_WORLD);
numsent = numsent + ;
}
else { /*发送空消息 关闭slave进程*/
float close = 1.0;
MPI_Send(&close, , MPI_FLOAT, status.MPI_SOURCE, , MPI_COMM_WORLD);
}
}
/*打印乘法结果*/
for (j = ; j < cols; j++ )
printf("%1.3f\t", c[j]);
printf("\n");
}
/*slave进程*/
else {
MPI_Bcast(&b[], cols, MPI_FLOAT, master, MPI_COMM_WORLD);
while()
{
row_result = ;
MPI_Recv(&c[], cols, MPI_FLOAT, master, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if ( != status.MPI_TAG ) {
for ( j = ; j < cols; j++ )
{
row_result = row_result + b[j]*c[j];
}
//printf("myid:%d, MPI_TAG:%d, c[0]:%f, row_result:%1.3f\n", myid, status.MPI_TAG,c[0], row_result);
MPI_Send(&row_result, , MPI_FLOAT, master, status.MPI_TAG, MPI_COMM_WORLD);
}
else {
break;
}
}
}
MPI_Finalize();
}

代码执行结果:

这里有两个细节需要注意:

(1)关于++运算符在传递参数时的使用

  代码40行一开始我写成了“MPI_Send(&a[numsent][0], cols, MPI_FLOAT, i, ++numsent, MPI_COMM_WORLD);”

  显然,这个代码是错误的,问题的关键就出在了numsent这个变量上。

  比如,在执行这个语句前numsent的值为1。我希望第一个参数传递的是&a[1][0],第四个参数是2,并且numsent此时的值为2。

  这是一个思维陷阱,++numsent会导致numsent先加1,再作为值传递到MPI_Send实参中。所以,此时第一个参数变成了&a[2][0],并不是原先想要的。

  为了避免这种思维陷阱,以后再设计传递函数的参数时,应该禁止在传递参数时使用++这个运算符;多写一个赋值语句不会怎样,反之用++运算符就容易掉进思维陷阱。

(2)关于#define带参宏定义的使用

  代码中用到#define min(x,y) ((x)>(y)?(y):(x))这个宏定义,这个地方稍微吃了一点儿坑。

  记住两个括号的原则:

  a. 每个变量都要加括号

  b. 宏定义整体要加括号

  如果想看看宏定义是否替换问正确的内容了,可以cc -E选项来查看预处理后的结果。

2.2 master进程打印各个slaver进程发送来的消息

代码实现:

 #include "mpi.h"
#include <stdio.h>
#include <string.h> #define MSG_EXIT 1
#define MSG_ORDERED 2
#define MSG_UNORDERED 3 void master_io();
void slaver_io(); int main(int argc, char *argv[])
{
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if ( == rank) {
master_io();
}
else {
slaver_io();
}
MPI_Finalize();
} void master_io()
{
int i,j,size,nslaver,firstmsg;
char buf[], buf2[];
MPI_Status status;
MPI_Comm_size(MPI_COMM_WORLD, &size);
nslaver = size - ;
while (nslaver>) {
MPI_Recv(buf, , MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
switch(status.MPI_TAG){
case MSG_EXIT:
nslaver--;
break;
case MSG_UNORDERED: // 如果是乱序就直接输出
fputs(buf, stdout);
break;
case MSG_ORDERED:
firstmsg = status.MPI_SOURCE;
/* 这段程序设计的比较巧妙
* 虽然每个slaver发送两个message 不同slaver的message可能有重叠
* 但是每个每个slaver每次只能被接收一个message
* 这一轮一旦接收到message了
* 就不再从这个slaver接收消息了
* 概括说:
* 第一次进入MSG_ORDERED处理各个slaver第一次调用MPI_Send发送的消息
* 第二次进入MSG_ORDERED处理各个slaver第二次调用MPI_Send发送的消息*/
for ( i=; i<size; i++)
{
if (i==firstmsg) {
fputs(buf, stdout);
}
else {
MPI_Recv(buf2, , MPI_CHAR, i, MSG_ORDERED, MPI_COMM_WORLD, &status);
fputs(buf2, stdout);
}
}
break;
}
}
} void slaver_io()
{
char buf[];
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
/*第一次向master进程发送有序消息*/
sprintf(buf, "Hello from slaver %d, ordered print\n", rank);
MPI_Send(buf, strlen(buf)+, MPI_CHAR, , MSG_ORDERED, MPI_COMM_WORLD);
/*第二次向master进程发送有序消息*/
sprintf(buf, "Bye from slaver %d, ordered print\n", rank);
MPI_Send(buf, strlen(buf)+, MPI_CHAR, , MSG_ORDERED, MPI_COMM_WORLD);
/*第一次向master发送无序消息*/
sprintf(buf, "I'm exiting (%d), unordered print\n", rank);
MPI_Send(buf, strlen(buf)+, MPI_CHAR, , MSG_UNORDERED, MPI_COMM_WORLD);
/*发送结束信息*/
MPI_Send(buf, , MPI_CHAR, , MSG_EXIT, MPI_COMM_WORLD);
}

有一个不错的小巧的设计思路,在代码注释中具体说明了,这里不再赘述。

把这个例子看明白了,非常有利于理解master slaver的设计模式,并且知道如何花式保证每次master都能接受发送自slaver相同“批次”的消息。

小结:

看完了这章内容,对MPI程序设计稍微有些感觉了。另外,感觉前面撸了一遍APUE的帮助挺大的,虽然MPI程序接口比较简单,但是总让我想到APUE中的fork+IPC的部分。

【MPI学习2】MPI并行程序设计模式:对等模式 & 主从模式的相关教程结束。

《【MPI学习2】MPI并行程序设计模式:对等模式 & 主从模式.doc》

下载本文的Word格式文档,以方便收藏与打印。