高性能计算·实验(四) wu-kan

实现并行梯形数值积分的 MPI 算法

下为单节点求解$\int_{a}^bf(x)dx$的函数,其中e是逼近的步长。由于单账号的运行时间权限是一个小时,这里我经过调参,取e=1e-2是可以在时限内计算完毕的(约四十分钟)。

lf ask(lf a, lf b, lf f(lf x), lf e)
{
	lf ans = 0, fa = f(a);
	while (a < b)
	{
		lf fb = f(a += e);
		ans += fa + fb;
		fa = fb;
	}
	return ans * e * 0.5;
}

这里有几处优化细节,通过调整求值关系,使得每个点处的函数值不被重复计算;同时把梯形的高移动到循环外面,减少计算次数。

当然,由于这里的步长其实只有百分之一,因此最后求出来的积分其实精度不怎么高,和自己手算的结果相比只能保证五六位的有效位数。

用 MPI 的点对点通信函数完成梯形数值积分的并行算法

各进程将各自计算的结果myAns发送到 0 号进程并求和ans

	if (!id)
	{
		for (int i = 1; i < numThreads; ++i)
		{
			MPI_Recv(
				&myAns,
				1,
				MPI_DOUBLE,
				i,
				0,
				MPI_COMM_WORLD,
				MPI_STATUES_IGNORE);
			ans += myAns;
		}
	}
	else
		MPI_Send(
			&myAns,
			1,
			MPI_DOUBLE,
			0,
			0,
			MPI_COMM_WORLD);

用 MPI 的集合通信函数完成梯形数值积分的并行算法

使用Reduce操作将各线程的结果myAns归约到 0 号进程的ans

	MPI_Reduce(
		&myAns,
		&ans,
		1,
		MPI_DOUBLE,
		MPI_SUM,
		0,
		MPI_COMM_WORLD);

将上面的算法对瑕积分扩展

假设被积函数 f(x)可能是无界函数,积分区间也很大,如

  • f(x)=exp(bx)/sqrt(1+exp(cx)), 0<x<=L;
  • 这时积分区间[0,L]划分不能是等长的,每个任务的小区间需要传递,并且参数 b,c,L 也需要传递。

这里选择使用自适应辛普森方法求解瑕积分,根据这篇论文,论证了加一个十五分之一的偏移收敛会比较快。

lf f(lf x)
{
	return exp(x) / sqrt(1 + exp(x));
}
lf simpson(lf a, lf b, lf f(lf x))
{
	if (fabs(a) < EPS)
		return f((a + b) * 0.5) * (b - a);
	return (f(a) + 4 * f((a + b) * 0.5) + f(b)) * (b - a) / 6;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
	if (e < EPS)
		return simpson(a, b, f);
	lf c = (a + b) * 0.5, L = simpson(a, c, f), R = simpson(c, b, f), delta = (L + R - simpson(a, b, f)) / 15;
	return fabs(delta) < e ? L + R + delta : ask(a, c, f, e * 0.5) + ask(c, b, f, e * 0.5);
}

不过,在对老师给的这个函数(取b = c = 1)进行瑕积分求解的时候,发现积分上限达到$2^{10}$的时候,结果已经超过了double类型的表示范围,因此这段代码仅用来验证瑕积分求解的正确性,下面进行多核测速的代码仍然使用的是梯形积分法。

$ time mpiexec -n 1 ./integral 9
3022859422404135022056366724809190601572944773201862114369138391962853640177743445854646916709852400063199838208.000000 with 1 proces, problem size 512.
real    0m0.058s
user    0m0.000s
sys     0m0.063s
$ time mpiexec -n 1 ./integral 10
-nan with 1 proces, problem size 1024.
real    0m0.058s
user    0m0.000s
sys     0m0.047s

填表

运行时间 T(单位 s)

详见下面的输出文件部分。

进程数\问题规模$2^{14}$$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
10.4240.5131.92823.536357.396706.6441441.164
20.4260.5531.40311.794182.182354.155706.369
40.4660.5421.0196.23092.010179.062353.818
80.5430.4870.7163.66746.01492.823177.530
160.5410.5760.7192.17723.25346.10293.670
320.8980.7470.8261.66312.63323.16945.404
641.4641.0671.1161.6386.81212.32423.790
1281.1101.0761.1761.4164.1056.95212.525
2561.5341.1881.1961.3192.8484.1106.936
5121.8021.7041.6951.7542.2102.6263.478

可以看到:

  • 随着问题规模增加,同进程数下运行时间不断增加
  • 问题规模比较小的时候,进程越多并行时间开销越大
  • 问题规模比较大的时候,运行时间减少

加速比 S

加速比 S=同等规模下的串行时间/并行时间。

进程数\问题规模$2^{14}$$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
11.001.001.001.001.001.001.00
21.000.931.372.001.962.002.04
40.910.951.893.783.883.944.07
80.781.052.696.417.777.618.12
160.780.892.6810.8115.3715.3315.39
320.470.692.3314.1528.2930.5031.74
640.290.481.7214.3852.4757.3460.58
1280.380.481.6316.6287.06101.65115.06
2560.280.431.6117.84125.49171.93207.78
5120.240.301.1413.42161.71269.10414.37

可以看到:

  • 随着问题规模增加,同进程数下加速比不断增加
  • 随着进程数增加,同问题规模下加速比不断减少

效率 E

运行效率 E=加速比 S/并行线程数。

进程数\问题规模$2^{14}$$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
11.001.001.001.001.001.001.00
20.500.930.681.000.981.001.02
40.230.240.470.950.970.991.02
80.100.130.340.810.970.951.02
160.050.060.170.680.960.960.96
320.010.020.070.440.880.950.99
640.0050.0080.030.220.820.900.95
1280.0030.0040.010.130.680.790.90
2560.0010.0020.0060.070.490.670.81
5120.00050.00060.0020.030.320.530.81

可以看到:

  • 随着问题规模增加,同进程数下效率不断增加
  • 随着进程数增加,同问题规模下效率不断减少

此外,部分数据出现了效率略大于 1 的情况,可能是由于系统运行时的「抖动」造成的,仍然在误差范围内,符合常识。

源代码integral.c

去掉#define WK_SIMPSON前的注释符号即可选用自适应辛普森公式求解瑕积分。

去掉#define WK_P2P前的注释符号即可选用点对点通信。

//#define WK_SIMPSON
//#define WK_P2P
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
typedef double lf;
const lf EPS = 1e-5;
#ifdef WK_SIMPSON
lf f(lf x)
{
	return exp(x) / sqrt(1 + exp(x));
}
lf simpson(lf a, lf b, lf f(lf x))
{
	if (fabs(a) < EPS)
		return f((a + b) * 0.5) * (b - a);
	return (f(a) + 4 * f((a + b) * 0.5) + f(b)) * (b - a) / 6;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
	if (e < EPS)
		return simpson(a, b, f);
	lf c = (a + b) * 0.5, L = simpson(a, c, f), R = simpson(c, b, f), delta = (L + R - simpson(a, b, f)) / 15;
	return fabs(delta) < e ? L + R + delta : ask(a, c, f, e * 0.5) + ask(c, b, f, e * 0.5);
}
#else
lf f(lf x)
{
	return x;
}
lf ask(lf a, lf b, lf f(lf x), lf e)
{
	lf ans = 0, fa = f(a);
	while (a < b)
	{
		lf fb = f(a += e);
		ans += fa + fb;
		fa = fb;
	}
	return ans * e * 0.5;
}
#endif
int main(int argc, char **argv)
{
	MPI_Init(&argc, &argv);
	int id, numThreads;
	MPI_Comm_rank(MPI_COMM_WORLD, &id);
	MPI_Comm_size(MPI_COMM_WORLD, &numThreads);
	lf n = pow(2, atoi(argv[1])),
	   len = n / numThreads,
	   myL = id * len,
	   myAns = ask(myL, myL + len, f, 1e-2),
	   ans = myAns;
#ifdef WK_P2P
	if (!id)
	{
		for (int i = 1; i < numThreads; ++i)
		{
			MPI_Recv(
				&myAns,
				1,
				MPI_DOUBLE,
				i,
				0,
				MPI_COMM_WORLD,
				MPI_STATUES_IGNORE);
			ans += myAns;
		}
	}
	else
		MPI_Send(
			&myAns,
			1,
			MPI_DOUBLE,
			0,
			0,
			MPI_COMM_WORLD);
#else
	MPI_Reduce(
		&myAns,
		&ans,
		1,
		MPI_DOUBLE,
		MPI_SUM,
		0,
		MPI_COMM_WORLD);
#endif
	if (!id)
		printf("%f with %d proces, problem size %.0f.", ans, numThreads, n);
	MPI_Finalize();
}

作业脚本integral.pbs

对于不同的核数需要不同的作业脚本,这里只给出 512 核对应的作业脚本integral.pbs

这里学习了一下从 PBS 的本地变量中获得 mpiexec 的运行配置的方法。

#PBS -N integral
#PBS -l nodes=16:ppn=32
#PBS -j oe

source /public/software/profile.d/mpi_openmpi-intel-2.1.2.sh
mpicc integral.c -o integral -std=c11 -lm
for logN in 14 18 22 26 30 31 32
do
time mpiexec -machinefile $PBS_NODEFILE ./integral $logN
done

输出文件integral.o1761

对于不同的核数需要不同的输出文件,这里只给出 512 核对应输出文件integral.o1761

其他核数对应的输出文件见附件

  • integral.o1795对应 256 核(nodes=8:ppn=32
  • integral.o1796对应 128 核(nodes=4:ppn=32
  • integral.o1797对应 64 核(nodes=2:ppn=32
  • integral.o1798对应 32 核(nodes=1:ppn=32
  • integral.o1799对应 16 核(nodes=1:ppn=16
  • integral.o1800对应 8 核(nodes=1:ppn=8
  • integral.o1801对应 4 核(nodes=1:ppn=4
  • integral.o1802对应 2 核(nodes=1:ppn=2
  • integral.o1803对应 1 核(nodes=1:ppn=1
134218391.043055 with 512 proces, problem size 16384.
real	0m1.802s
user	0m2.328s
sys	0m6.054s
34359872522.271759 with 512 proces, problem size 262144.
real	0m1.704s
user	0m2.188s
sys	0m5.706s
8796101094338.814453 with 512 proces, problem size 4194304.
real	0m1.695s
user	0m2.281s
sys	0m5.844s
2251800146489160.500000 with 512 proces, problem size 67108864.
real	0m1.754s
user	0m4.053s
sys	0m5.900s
576461292861516288.000000 with 512 proces, problem size 1073741824.
real	0m2.210s
user	0m20.316s
sys	0m5.308s
2305845201815247872.000000 with 512 proces, problem size 2147483648.
real	0m2.626s
user	0m36.019s
sys	0m4.861s
9223215905289959424.000000 with 512 proces, problem size 4294967296.
real	0m3.478s
user	0m59.168s
sys	0m5.120s

完成正则采样排序 PSRS 的 MPI 算法

排序文件放在集群的 shared_dir 目录下,文件名为:psrs_data。文件采用二进制格式:头八个字节为排序数据个数(long int), 每个数据为 8 个字节。

请按要求使用 MPI 集合通信:具体要求见课件。

psrs.c

这里遇到一个问题,MPI 数据类型选择MPI_LONG_INT时发现传过来的数据大小只有 4 个字节?于是使用MPI_BYTE类型,而将我们的数据按照比特重新计算大小并传输。

//#define WK_GEN_DATA
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
typedef long int ll;
int sgn(ll a) { return a < 0 ? -1 : a ? 1 : 0; }
int cmpll(const void *a, const void *b) { return sgn(*(ll *)a - *(ll *)b); }
int main(int argc, char **argv)
{
	int id, numThreads;
	MPI_Init(&argc, &argv);
	double start = MPI_Wtime();
	MPI_Comm_rank(MPI_COMM_WORLD, &id);
	MPI_Comm_size(MPI_COMM_WORLD, &numThreads);
#ifdef WK_GEN_DATA
	ll n = ((ll)1) << atoi(argv[1]),
	   len = n / numThreads,
	   *a = (ll *)malloc(len * sizeof(ll));
#else
	ll n;
	MPI_File fh;
	MPI_File_open(
		MPI_COMM_WORLD,
		argv[1],
		MPI_MODE_RDONLY,
		MPI_INFO_NULL,
		&fh);
	MPI_File_read_at_all(
		fh,
		0,
		&n,
		1 * sizeof(ll), //1,
		MPI_BYTE,		//MPI_LONG_INT,
		MPI_STATUSES_IGNORE);
	ll blockSize = (n + numThreads - 1) / numThreads,
	   beg = id * blockSize,
	   end = beg + blockSize < n ? beg + blockSize : n,
	   len = end - beg,
	   *a = (ll *)malloc(len * sizeof(ll));
	MPI_File_read_at_all(
		fh,
		sizeof(ll) * (1 + beg),
		a,
		len * sizeof(ll), //len,
		MPI_BYTE,		  //MPI_LONG_INT,
		MPI_STATUSES_IGNORE);
	MPI_File_close(&fh);
	MPI_Barrier(MPI_COMM_WORLD);
	if (!id)
		printf("Finish reading %ld datas at %fs with %d procs.\n", n, MPI_Wtime() - start, numThreads);
#endif
	qsort(a, len, sizeof(ll), cmpll);
	//局部排序
	ll *sample = (ll *)malloc(numThreads * sizeof(ll)),
	   *sampleGather = (ll *)malloc(numThreads * numThreads * sizeof(ll));
	for (int i = 0; i < numThreads; ++i)
		sample[i] = a[len / numThreads * i];

	//采样
	MPI_Gather(
		sample,
		numThreads * sizeof(ll), //numThreads,
		MPI_BYTE,				 //MPI_LONG_INT,
		sampleGather,
		numThreads * sizeof(ll), //numThreads,
		MPI_BYTE,				 //MPI_LONG_INT,
		0,
		MPI_COMM_WORLD);

	if (!id)
	{
		qsort(sampleGather, numThreads * numThreads, sizeof(ll), cmpll); //可优化为n路归并
		for (int i = 0; i < numThreads; ++i)
			sample[i] = sampleGather[i * numThreads]; //获得主元
#ifndef WK_GEN_DATA
		printf("Get privot at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
	}

	MPI_Bcast(
		sample,
		numThreads * sizeof(ll), //numThreads,
		MPI_BYTE,				 //MPI_LONG_INT,
		0,
		MPI_COMM_WORLD); //分发主元

	int *sendCounts = (int *)malloc(numThreads * sizeof(int)),
		*recvCounts = (int *)malloc(numThreads * sizeof(int)),
		*sdisp = (int *)malloc(numThreads * sizeof(int)),
		*rdisp = (int *)malloc(numThreads * sizeof(int));

	for (int i = 0; i < numThreads; ++i)
		sendCounts[i] = 0;

	for (int i = 0, j = 0; i < len; ++i)
	{
		while (j < numThreads && a[i] >= sample[j])
			++j;
		//++sendCounts[j - 1];
		sendCounts[j - 1] += sizeof(ll);
	}

	MPI_Alltoall( //提前通知一下节点,各个节点要准备接收多少数据
		sendCounts,
		//1 * sizeof(int),
		1,
		//MPI_BYTE,
		MPI_INT,
		recvCounts,
		//1 * sizeof(int),
		1,
		//MPI_BYTE,
		MPI_INT,
		MPI_COMM_WORLD);
#ifndef WK_GEN_DATA
	if (!id)
		printf("Send counts at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
	sdisp[0] = rdisp[0] = 0;
	for (int i = 1; i < numThreads; ++i)
	{
		sdisp[i] = sendCounts[i - 1] + sdisp[i - 1];
		rdisp[i] = recvCounts[i - 1] + rdisp[i - 1];
	}

	//ll *local = (ll *)malloc((rdisp[numThreads - 1] + recvCounts[numThreads - 1]) * sizeof(ll));
	ll *local = (ll *)malloc(rdisp[numThreads - 1] + recvCounts[numThreads - 1]);

	MPI_Alltoallv(
		a,
		sendCounts,
		sdisp,
		MPI_BYTE, //MPI_LONG_INT,
		local,
		recvCounts,
		rdisp,
		MPI_BYTE, //MPI_LONG_INT,
		MPI_COMM_WORLD);
#ifndef WK_GEN_DATA
	if (!id)
		printf("Alltoallv at %fs with %d procs.\n", MPI_Wtime() - start, numThreads);
#endif
	//qsort(local, rdisp[numThreads - 1] + recvCounts[numThreads - 1], sizeof(ll), cmpll); //可优化为n路归并
	qsort(local, (rdisp[numThreads - 1] + recvCounts[numThreads - 1]) / sizeof(ll), sizeof(ll), cmpll); //可优化为n路归并

	if (!id)
		printf("Finish sort %ld elements at %fs with %d procs.\n", n, MPI_Wtime() - start, numThreads);

	free(local);
	free(sendCounts);
	free(recvCounts);
	free(sdisp);
	free(rdisp);

	free(sample);
	free(sampleGather);
	free(a);

	MPI_Finalize();
}

psrs.pbs

作业调度脚本,直接使用能够获得的最大计算资源(512 核)进行计算。

#PBS -N psrs
#PBS -l nodes=16:ppn=32
#PBS -j oe

source /public/software/profile.d/mpi_openmpi-intel-2.1.2.sh
mpicc psrs.c -o psrs -std=c11
mpiexec -machinefile $PBS_NODEFILE ./psrs /public/home/shared_dir/psrs_data

psrs.o1529

作业脚本得到的输出文件。可以看到,这里一共使用了18.124634s就成功对4294967295个数进行了排序,其中有6.403623s花费在文件读入上,也就是总共花了不到十二秒就完成了排序,还是明显优于单机串行排序的。

Finish reading 4294967295 datas at 6.403623s with 512 procs.
Get privot at 8.314104s with 512 procs.
Send counts at 8.333872s with 512 procs.
Alltoallv at 17.238057s with 512 procs.
Finish all the works at 18.124634s with 512 procs.

填表

去掉#define WK_GEN_DATA前的注释,进行测试(不再从输入文件中得到数据,各进程随机生成指定问题规模规模的数据)。

此外,由于并行正则采样排序的限制,排序元素的数量至少要是进程数的平方倍。因此这里问题规模从$2^18$开始(进程数最多有$512=2^9$个)。

由于数据规模过大,进程数少的时候运行时间超过一小时被调度器kill了,因此对应数据没有测出运行时间和相应加速比。

运行时间 T(单位 s)

详见下面的输出文件部分。

进程数\问题规模$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
10.040.357.10936.30xx
20.040.233.10463.53xx
40.050.202.12229.26890.62x
80.050.181.56149.69592.83x
160.050.201.2280.37250.98490.23
320.090.220.9643.30176.67277.68
640.140.470.8921.77136.3090.79
1280.210.441.1317.8733.2144.52
2560.290.451.289.2217.4323.93
5120.380.571.475.108.3014.47

加速比 S

加速比 S=同等规模下的串行时间/并行时间。

进程数\问题规模$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
11.000.351.001.00xx
21.001.522.292.02xx
40.801.753.354.08xx
80.801.944.556.25xx
160.801.755.8111.65xx
320.441.597.4021.62xx
640.290.747.9843.00xx
1280.190.806.2852.40xx
2560.140.785.55101.55xx
5120.110.614.83183.58xx

效率 E

运行效率 E=加速比 S/并行线程数。

进程数\问题规模$2^{18}$$2^{22}$$2^{26}$$2^{30}$$2^{31}$$2^{32}$
11.000.351.001.00xx
21.000.761.151.01xx
40.200.440.841.02xx
80.100.240.570.78xx
160.050.110.320.73xx
320.010.050.230.66xx
640.0050.010.120.67xx
1280.0010.0060.050.41xx
2560.00050.0030.020.40xx
5120.00020.0010.0010.36xx