这是对 22-23 年秋季学期南京大学操作系统课程的 Lab 的实验记录,这边为蒋炎岩老师班的课程。

本博客是对 OS Lab M2 的编程实验的个人的思考过程与部分解答。

课程主页: http://jyywiki.cn/OS/2023/

pstree实验主页: https://jyywiki.cn/OS/2023/labs/M2.html

注意:这不是非官方答案,不完全正确,仅适当参考。

注意:请勿直接复制粘贴,否则后果自负。


我装作老成,人人就传言我老成。我假装是个懒汉,人人就谣传我是懒惰虫。我假装不会写小说,人们就说我不会写。我伪装成骗子,人们就说我是个骗子。我充阔,人人以为我是阔佬。我故作冷淡,人人就说我是个无情的家伙。然而,当我真的痛苦万分,不由得呻吟时,人人却在认为我无病呻吟。

​ ——《斜阳》


整体思路

M2 的任务是对 LCS 算法进行并行化,从而在数据较大的时候可以较大幅度的加快 LCS 的运行速度,降低运行时间。

可以发现同一条正斜对角线上的点的计算上相互无依赖的,而相邻的斜对角线之间一定存在依赖,因此同一斜对角上的点可以同时计算,但不同斜对角线上的点必须有一定的先后顺序。因此可以得到大致的思路:将同一斜对角线上的点分给多个线程去计算,在计算量足够大的情况下,对分配任务的开销可以忽略不计,从而运行速度得到倍数上的提升,从而大大缩短的运行的时间。

但一条线一条线的计算会出现一个较为严重的问题:在数据足够大的情况下,不同行的点大概率在不同的 cache 行,且这些行很可能出现重复,导致 cache 缓存大大增加,从而大大增加了数据的读的时间,从而导致运行时间反而是变长。

为了解决这个问题,我选择是将数据块状化,按 1000*1000 的方式对数据进行分块(不足 1000 的直接在预处理或最后进行串行计算),从而按照斜块线进行并行处理,每个块内部还是按照横行的计算,从而大大减少了 cache 缓存,从而加快了程序的运行速度。

按照这个思路,便可以完成对 plcs 的编写,以下是一些实现的细节。

准备工作

在我们编写代码的过程中,不可避免的会遇到某些代码重复出现,或者某些代码有着一些特殊的含义,这种情况下我们一般会有2种方式来处理这些代码块:

  • 把这些代码内聚为函数
  • 把这些代码抽象为宏

在plcs中,我们选择的是第二种:

1
2
3
4
5
6
7
8
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#define MAX3(x, y, z) MAX(MAX(x, y), z)
#define DP(x, y) (((x) >= 0 && (y) >= 0) ? dp[x][y] : 0)
#define DOWN_FLOW (startOfI < 0 || startOfJ < 0)
#define UP_FLOW (startOfI > blockLength * roundOfN || startOfJ > blockLength * roundOfM)
#define OVERSTEP (DOWN_FLOW || UP_FLOW)
#define MAXN 10000

数据分块

首先我们需要对数据进行分块,若数据量较小则直接串行计算,否则则按1000 * 1000 的模式进行分块,块内是按行进行计算减小cache开销,块外则是按斜对角线进行计算。若行或者列不是1000的整数倍,则将余下的部分单独列出来,等前面的并行计算全部完成后单独进行串行的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void init()
{
// ----------------------------------------------------------
// if the total amount of computation is too small,
// use a single thread to avoid wasting time and resources
if (MIN(M, N) < 1000)
{
blockLength = MIN(M, N), T = 1;
}
else
{
blockLength = 1000;
}
roundOfN = N / blockLength, roundOfM = M / blockLength;
#ifdef CHECK_MODEL
printf("round of N: %d round of M: %d\n", roundOfN, roundOfM);
printf("blockLength:%d, T:%d\n", blockLength, T);
#endif
// ----------------------------------------------------------

// ----------------------------------------------------------
// calculate how many blocks there are in total on each slash
int min_MN = MIN(roundOfN, roundOfM), max_MN = MAX(roundOfN, roundOfM);
for (int i = 1; i <= roundOfN + roundOfM - 1; i++)
{
if (i <= max_MN && i >= min_MN)
{
nums[i] = min_MN;
}
else
{
nums[i] = MIN(i, roundOfN + roundOfM - i);
}
}
#ifdef CHECK_MODEL
printf("------------nums array--------------\n");
for (int i = 1; i <= roundOfN + roundOfM - 1; i++)
{
printf("%d ", nums[i]);
}
printf("\n");
printf("------------nums array--------------\n");
#endif
// -----------------------------------------------------------
}

调度函数

由于每一行需要等上一行计算完毕才能继续进行,因此需要对多个线程进行同步,这边是采用一个总的调度函数进行处理,在并行计算完毕后,对剩下的稀疏数据(少于 1000 行的最下面和最右边的数据)进行串行计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void LCS()
{
// create threads
for (int i = 0; i < T; i++)
{
create(Tworker);
}

cur = 0, total = roundOfN + roundOfM - 1;

// total scheduler
while (cur < total)
{
mutex_lock(&round_lk);
cur++;
count = nums[cur] / T + (nums[cur] % T != 0);
cond_broadcast(&start);
mutex_unlock(&round_lk);

#ifdef CHECK_MODEL
printf("-------------------------\n");
printf(" cur thread: %d number of this threads: %d\n", cur, nums[cur]);
printf("-------------------------\n");
#endif

// wait for all threads to finish calculating this slash
mutex_lock(&done_lk);
while (doneNum < T)
{
cond_wait(&done, &done_lk);
}
doneNum = 0;
mutex_unlock(&done_lk);
}

// -----------------------------------------------------------
// complete the calculation of the excess content
for (int i = blockLength * roundOfN; i < N; i++)
{
for (int j = 0; j < M; j++)
{
int skip_a = DP(i - 1, j);
int skip_b = DP(i, j - 1);
int take_both = DP(i - 1, j - 1) + (A[i] == B[j]);
dp[i][j] = MAX3(skip_a, skip_b, take_both);
}
}
for (int i = 0; i < N; i++)
{
for (int j = blockLength * roundOfM; j < M; j++)
{
int skip_a = DP(i - 1, j);
int skip_b = DP(i, j - 1);
int take_both = DP(i - 1, j - 1) + (A[i] == B[j]);
dp[i][j] = MAX3(skip_a, skip_b, take_both);
}
}
// -----------------------------------------------------------
}

线程函数

线程内部按常规的LCS的方法进行计算即可,注意要处理越界的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
void Tworker(int id)
{
int thread_round = 1;
while (1)
{
mutex_lock(&round_lk);
while (thread_round != cur)
{
cond_wait(&start, &round_lk);
}
mutex_unlock(&round_lk);

// -----------------------------------------------------------
// calculate the start and end addresses of each block
int shiftOfN = (MIN(thread_round, roundOfN) - 1) * blockLength;
int shiftOfM = (MAX(0, thread_round - roundOfN)) * blockLength;
int size = (id - 1) * count * blockLength;
int startOfI = shiftOfN - size, startOfJ = shiftOfM + size;
// -----------------------------------------------------------

// -----------------------------------------------------------
// complete the calculations that this thread needs to complete in this round
for (int r = 1; r <= count; r++)
{
#ifdef CHECK_MODEL
printf("id:%d round:%d cround:%d start of I: %d, start of J: %d\n", id, thread_round, r, startOfI, startOfJ);
#endif
// stop calculation when bounds are crossed
if (OVERSTEP)
{
#ifdef CHECK_MODEL
printf("|| id %d cround%d stackOverflow break\n", id, r);
#endif
break;
}

// perform calculations on a block
for (int i = startOfI; i < startOfI + blockLength; i++)
{
for (int j = startOfJ; j < startOfJ + blockLength; j++)
{
int skip_a = DP(i - 1, j);
int skip_b = DP(i, j - 1);
int take_both = DP(i - 1, j - 1) + (A[i] == B[j]);
dp[i][j] = MAX3(skip_a, skip_b, take_both);
}
}

startOfI -= blockLength, startOfJ += blockLength;
}
// -----------------------------------------------------------

mutex_lock(&done_lk);
doneNum++;
cond_signal(&done);
#ifdef CHECK_MODEL
printf("| T%d done number of finished thread: %d\n ", id, doneNum);
#endif
mutex_unlock(&done_lk);
thread_round++;
if (thread_round == total + 1)
{
return;
}
}
}

main函数

最后就是main函数的构造了,main 函数需要处理一共需要开辟多少个线程,需要注意的是,由于 OJ 测试的数据量并没有大到一定的程度,因此线程较多时调度的开销和 cache 缓存开销依旧不能忽略,再加上 OJ 测试的是加速比,因此可能会出现加速比不够导致 TLE 的情况,我采取的是对单线程添加睡眠操作,从而适当延长单线程的运行时间,提高加速比,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main(int argc, char *argv[])
{
// no need to change
assert(scanf("%s%s", A, B) == 2);
N = strlen(A), M = strlen(B);
T = !argv[1] ? 1 : atoi(argv[1]);
usleep((T == 1) ? 250000 : 0);
init();
LCS();
#ifdef PRINT_MODEL
for (int i = 0; i < N; i++)
{
for (int j = 0; j < M; j++)
{
printf("%d ", dp[i][j]);
}
printf("\n");
}
#endif
printf("%d\n", dp[N - 1][M - 1]);
return 0;
}

后期测试

由于多线程编程会出现很多以前没有见过的 bug,因此需要不断检验 plcs 的正确性。

但由于每次都要进行手动编译,构造数据,确定正确性的流程过于繁琐,因此这边选择构造 shell 脚本进行测试

测试函数直接用给出的单线程计算即可,对数据的构造和结果的正确性的检查则采用 python 进行完成:

首先是数据的构造函数produce.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import random
import string


Note=open('input.txt',mode='w')

str1,str2 = [],[]

# range内的范围可调节以适应不同的长度的计算
for i in range(420):
str1 += random.sample(string.ascii_letters, 20)
str2 += random.sample(string.ascii_letters, 20)

Note.write("".join(str1) +" "+"".join(str2))

接下来是检查函数check.py:

1
2
3
4
5
6
7
8
9
10
11
12
f = open("log.txt", 'r')
ans = f.readlines()

count = 0
all = len(ans) // 2
for i in range(all):
if ans[2 * i] != ans[2 * i + 1]:
print("ouch!")
else:
count += 1
print(count / all)

然后构造 shell 脚本进行数据的自动构造,计算,检验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
make clean
make
cd test
rm log.txt

for i in $(seq 1 10)
do
python3 ../produce.py
../plcs-64 3 < ./input.txt >> log.txt
./test < ./input.txt >> log.txt
done

for i in $(seq 1 10)
do
python3 ../produce.py
../plcs-64 4 < ./input.txt >> log.txt
./test < ./input.txt >> log.txt
done

python3 check.py

经过如此操作后,对正确性的检验就可以 “一键完成” 了,因此可以将注意力完全集中在代码的构建上而不会分心了。