NJUOS-6-并发控制:同步

本文最后更新于:2 年前

上节课互斥,这节课就是同步啦。主要讲述的就是一些和同步相关的内容昂!!!互斥是手段,同步是目的,同步本质上是一种更为复杂的互斥

本次课上将会学会:如何在多处理上协同多个线程完成并行的任务。

线程同步

两个或者两个以上,随着时间变化的量,在变化过程中保持一定的相对关系。

  • Asynchronization = !Synchronization,要让并发程序,保持步调!!!

生产者-消费者问题

太经典了…多线程有逻辑上的先后关系,借助互斥量等方法,完成业务逻辑。互斥量就是帮助实现这个过程的手段。

image-20221214103621269

互斥锁(Spin)

左括号相当于是队列的生产者,右括号相当于是队列的消费者。是否可以使用类似于Spin这种方式,实现呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include "thread.h"
#include "thread-sync.h"

int n, count = 0;
mutex_t lk = MUTEX_INIT();

void Tproduce() {
while (1) {
retry:
mutex_lock(&lk);
if (count == n) {
mutex_unlock(&lk);
goto retry;
}
count++;
printf("(");
mutex_unlock(&lk);
}
}

void Tconsume() {
while (1) {
retry:
mutex_lock(&lk);
if (count == 0) {
mutex_unlock(&lk);
goto retry;
}
count--;
printf(")");
mutex_unlock(&lk);
}
}

int main(int argc, char *argv[]) {
assert(argc == 2);
n = atoi(argv[1]);
setbuf(stdout, NULL);
for (int i = 0; i < 8; i++) {
create(Tproduce);
create(Tconsume);
}
}
  • 检测工具,自己写的自己测一下噻
1
2
3
4
5
6
7
8
9
10
11
import sys

limit = int(sys.argv[1])
count, n = 0, 100000
while True:
for ch in sys.stdin.read(n):
if ch == '(': count += 1
if ch == ')': count -= 1
assert 0 <= count <= limit
print(f'{n} Ok.')

./a.out 5 | python3 checker-py 5

  • 自旋消耗性能 -> Sleep,条件满足别人再把我唤醒!!!

条件变量(CV)

image-20221214105011604

生产者&消费者

  • 例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include "thread.h"
#include "thread-sync.h"

int n, count = 0;
mutex_t lk = MUTEX_INIT();
cond_t cv = COND_INIT();

void Tproduce() {
while (1) {
mutex_lock(&lk);
if (count == n) {
cond_wait(&cv, &lk);
}
printf("("); count++;
cond_signal(&cv);
mutex_unlock(&lk);
}
}

void Tconsume() {
while (1) {
mutex_lock(&lk);
if (count == 0) {
pthread_cond_wait(&cv, &lk);
}
printf(")"); count--;
cond_signal(&cv);
mutex_unlock(&lk);
}
}

int main(int argc, char *argv[]) {
assert(argc == 2);
n = atoi(argv[1]);
setbuf(stdout, NULL);
for (int i = 0; i < 8; i++) {
create(Tproduce);
create(Tconsume);
// debug create(Tconsume);
}
}

  • 压测工具&代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class ProducerConsumer:
locked, count, log, waits = '', 0, '', ''

def tryacquire(self):
self.locked, seen = '🔒', self.locked
return seen == ''

def release(self):
self.locked = ''

@thread
def tp(self):
for _ in range(2):
while not self.tryacquire(): pass # mutex_lock()

if self.count == 1:
# cond_wait
_, self.waits = self.release(), self.waits + '1'
while '1' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + '(', self.count + 1
self.waits = self.waits[1:] # cond_signal
self.release() # mutex_unlock()

@thread
def tc1(self):
while not self.tryacquire(): pass

if self.count == 0:
_, self.waits = self.release(), self.waits + '2'
while '2' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + ')', self.count - 1

self.waits = self.waits[1:]
self.release()

@thread
def tc2(self):
while not self.tryacquire(): pass

if self.count == 0:
_, self.waits = self.release(), self.waits + '3'
while '3' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + ')', self.count - 1

self.waits = self.waits[1:]
self.release()

@marker
def mark_negative(self, state):
count = 0
for ch in self.log:
if ch == '(': count += 1
if ch == ')': count -= 1
if count < 0: return 'red'

上面的程序,模拟了C语言一个Producer,两个Consumer的行为。 -> 继而可以搭配model-checker,完成正确性的校验,更快找到问题!!!(Small Scope Hipothesis -> 小范围假设,找到引发问题的最小原因,建模 -> model-checker等工具,分析并发程序的Bug原因)

上面这个出问题了…如何debug? 第一步:能否找到隔离触发这个bug的最小条件(比如加一个Consumer试试?)-> 发现!!!有个Consumer在不该唤醒的时候,被唤醒了!!!经典问题!!!

  • Consumer在cond_signal,全局就一个信号量,导致唤醒了另外一个Consumer!!! -> 解决方法:
    • 两个条件变量,一个专门唤醒Consumer,另外一个专门唤醒Producer,这样不会唤醒混了。
    • 就算是唤醒了Consumer,它也不能直接去执行!!!还要检查,是否满足自己执行的条件!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void Tproduce() {
while (1) {
mutex_lock(&lk);
// mention!!!
while (count == n) {
// 不能保证你醒来的时候,count还是 == n...
cond_wait(&cv, &lk);
}
printf("("); count++;
cond_signal(&cv);
mutex_unlock(&lk);
}
}

void Tconsume() {
while (1) {
mutex_lock(&lk);
// mention!!!
while (count == 0) {
// 不能保证你醒来的时候,count还是 == 0...
pthread_cond_wait(&cv, &lk);
}
printf(")"); count--;
cond_signal(&cv);
mutex_unlock(&lk);
}
}


if(count == n) 和 if(count == 0) -> 把if改成while昂!-> 无所谓,你不能先出手…你必须符合条件才能执行哇…

把你唤醒,你不一定能执行!!!唤醒的也不一定满足条件,一定要记得while检查,而不是if,尤其涉及到这种条件判断和Sleep耦合的…你醒来,条件就不一定满足了!!!

image-20221214114618333

并行计算实现

image-20221214114913587

  • 上面这个就是可以实现并行计算,waiting for job,当创建了任务的时候,job就会被取出来执行。
  • 有了线程池的初版模型昂!!!

程序并行化

画一个DAG出来昂!!!

  • 比如一个DP算法,我们可以根据递推公式,找到每一个元素之间的依赖关系。
  • 可以使用拓扑排序,找到每一个Layer(BFS)
1
2
3
4
5
a
-> d -> f
b -> res

c -> e

第一层可执行的任务:a,b,c

第二层可执行的任务:d, e

第三层可执行的任务:f

第四层可执行的任务:res

  • 每一层的任务之间没有依赖关系,可以并行,就可以交给不同的线程去执行,这样就实现了任务的并行化。甚至可以引出MapReduce方法!!!

奇怪的问题

  1. https://leetcode.cn/problems/building-h2o/

三个一组,三个一组 -> 构成水分子

  1. jyy你真行啊。。。直接level up,自己出题当其中考试题,同学们都说谢谢你嗷!!!

image-20221214115657708

CV解决的核心:while循环的条件是啥?啥时候可以打印<,>和_对不?判断现在输出的状态 -> 掌握全局状态机的分叉!!!

  • 不用动脑子,构造一个状态机 -> from … to …就行哇!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "thread.h"

#define LENGTH(arr) (sizeof(arr) / sizeof(arr[0]))

enum { A = 1, B, C, D, E, F, };

struct rule {
int from, ch, to;
};

struct rule rules[] = {
{ A, '<', B },
{ B, '>', C },
{ C, '<', D },
{ A, '>', E },
{ E, '<', F },
{ F, '>', D },
{ D, '_', A },
};
int current = A, quota = 1;

pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int next(char ch) {
for (int i = 0; i < LENGTH(rules); i++) {
struct rule *rule = &rules[i];
if (rule->from == current && rule->ch == ch) {
return rule->to;
}
}
return 0;
}

void fish_before(char ch) {
pthread_mutex_lock(&lk);
while (!(next(ch) && quota)) {
// can proceed only if (next(ch) && quota)
pthread_cond_wait(&cond, &lk);
}
quota--;
pthread_mutex_unlock(&lk);
}

void fish_after(char ch) {
pthread_mutex_lock(&lk);
quota++;
current = next(ch);
assert(current);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lk);
}

const char roles[] = ".<<<<<>>>>___";

void fish_thread(int id) {
char role = roles[id];
while (1) {
fish_before(role);
putchar(role); // can be long; no lock protection
fish_after(role);
}
}

int main() {
setbuf(stdout, NULL);
for (int i = 0; i < strlen(roles); i++)
create(fish_thread);
}

全部notify_all起来,符合条件就打,不符合条件就睡。。。这不就很简单???!!!不用动脑子!!!

  • 推荐条件变量,绝对万能!!! -> 缺陷:在我看来,本质就是互斥锁plus,互斥锁 + sleep实现饿了同步而已。

信号量(Signal)

本质就是拓展了互斥锁,成了一个🔒库。可以协调多个线程的同步问题。-> or 带有计算器的锁,这么想也对!!!

  • 可以作为互斥锁来使用,同时信号量封装好的机制,能帮助我们很优雅实现生产者和消费者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include "thread.h"
#include "thread-sync.h"

sem_t fill, empty;

void producer() {
while (1) {
P(&empty);
printf("(");
V(&fill);
}
}

void consumer() {
while (1) {
P(&fill);
printf(")");
V(&empty);
}
}

int main(int argc, char *argv[]) {
assert(argc == 2);
SEM_INIT(&fill, 0);
SEM_INIT(&empty, atoi(argv[1]));
for (int i = 0; i < 8; i++) {
create(producer);
create(consumer);
}
}

明显简单,“单位资源”明确的情况下很方便,很好用。但是遇到复杂情况,例如上面奇怪的问题,明显条件变量复杂度更高,灵活性也越高!

  • 小心处理,可能会出现一些我们上面没考虑到的并发问题。

哲学家吃饭问题

image-20221214130446099

引发DeadLock的问题 -> Debug简单,可以打印出每个线程手上持有锁的情况。

  • 很重要的思想:反复告诉我们的就是,很多奇奇妙妙的算法,方法。不要尝试去这么做,有万能的方法就用万能的方法。代码的量级上去了之后,“聪明”会带来“困难”。 -> 互斥锁最简单!!!试试呗!!!
    • 加个互斥锁,想要操作的哲学家看看,如果左右不能全部拿起来,就放下来等待!
    • 如果拿的起来,就吃饭!!!
  • 使用信号量(复杂的):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include "thread.h"
#include "thread-sync.h"

#define N 3
sem_t locks[N];

void Tphilosopher(int id) {
int lhs = (N + id - 1) % N;
int rhs = id % N;
while (1) {
P(&locks[lhs]);
printf("T%d Got %d\n", id, lhs + 1);
P(&locks[rhs]);
printf("T%d Got %d\n", id, rhs + 1);
V(&locks[lhs]);
V(&locks[rhs]);
}
}

int main(int argc, char *argv[]) {
for (int i = 0; i < N; i++) {
SEM_INIT(&locks[i], 1);
}
for (int i = 0; i < N; i++) {
create(Tphilosopher);
}
}

可控性差,造成死锁了。没有办法,在“抢不到两个”叉子的时候,放下手中的叉子。P操作,在你抢不到叉子的时候,直接就睡了,手上的叉子放不下来……

  • 成功的尝试(万能的互斥锁)
1
2
3
4
5
6
7
8
9
10
11
mutex_lock(&mutex);
while (!(avail[lhs] && avail[rhs])) {
wait(&cv, &mutex);
}
avail[lhs] = avail[rhs] = false;
mutex_unlock(&mutex);

mutex_lock(&mutex);
avail[lhs] = avail[rhs] = true;
broadcast(&cv);
mutex_unlock(&mutex);

一把互斥锁,就很方便昂!!!全部人一把锁,一个一个哲学家的抢,抢不到就释放锁,抢到了,就可以吃饭了昂!!!

  • 成功的尝试(忘了信号量,让一个人集中管理叉子吧!“Leader/Follower” - 生产者/消费者)

服务员统一管理叉子!!!分布式系统中非常常见的解决思路 (HDFS, …) -> 你想吃?服务员帮你看看,能吃就吃,你别自己去动筷子昂!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Tphilosopher(int id) {
send_request(id, EAT);
P(allowed[id]); // waiter 会把叉子递给哲学家
philosopher_eat();
send_request(id, DONE);
}

void Twaiter() {
while (1) {
(id, status) = receive_request();
if (status == EAT) { ... }
if (status == DONE) { ... }
}
}

集中式的算法,远远比分布式的算法,容易弄对的多!!!一个服务员是可以对于所有的叉子是可以有一个清楚的认识的!!!Master/Slave!!!!!

Summary

image-20221214133117205

抛开 workload 谈优化就是耍流氓

References

  1. video link: https://www.bilibili.com/video/BV17T4y1S7RS/?spm_id_from=333.999.0.0&vd_source=ff957cd8fbaeb55d52afc75fbcc87dfd
  2. paper link: https://www.usenix.org/conference/nsdi20/presentation/brooker
  3. 奇怪的问题之H2O: https://leetcode.cn/problems/building-h2o/

NJUOS-6-并发控制:同步
https://alexanderliu-creator.github.io/2022/12/14/njuos-6-bing-fa-kong-zhi-tong-bu/
作者
Alexander Liu
发布于
2022年12月14日
许可协议