0%

无锁队列实现分析

基本逻辑

本文参考学习了很多网上介绍无锁队列的文章,其中讲的好的有陈皓的这篇, Faustino的这篇
DPDK里也有实现无锁队列,对应的介绍文章在这里

队列可以基于链表或者数组实现,无锁队列也有链表和数组实现的方式,陈皓的文章中对链表
实现的方式有深入的分析,总的来讲链表实现方式最大的问题是引入了ABA的问题,数组的方式
其实也有ABA的问题,Faustino和DPDK里有数组实现的实例。数组实现要一开始就分配全部的队列
内存,和链表实现相比使用内存会多。

我们这里基于数组实现,看下无锁队列的核心实现逻辑,Faustino的文章把基于数组的实现
已经讲的很好,直接看原文效果更好。

1
2
3
4
5
6
7
+---+---+---+---+---+---+---+---+
| | x | x | x | x | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write

如上是基本数据结果的示意图,write表示push写入数据的位置,read表示pop读取数据的位置,
push数据进入队列并不是一个原子的过程,而是先原子的抢到队列的位置,随后再把数据copy
到位置上,commit就表示彻底完成copy数据的位置。

多个push抢位置的逻辑是这样的,先原子的把write读到一个临时变量里(curr_write),然后
用CAS的方式尝试把队列write的值改成后续的一个值((curr_write + 1) % QSIZE)的值,判断
的依据是队列在这个短暂的时间间隙没有被其他push写入过数据,也就是队列的write还是之前
读出的curr_write,整个CAS的原子行为用CAS指令表达起来就是:CAS(&write, curr_write, (curr_write + 1) % QSIZE),
通过CAS的返回值得知我们是否抢到了队列位置,写入成功表示抢到了队列位置,队列write
往后移动了一个位置。

1
2
3
4
5
6
7
8
9
                   curr_write
v
+---+---+---+---+---+---+---+---+
| | x | x | x | x | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write

成功抢到队列位置后,随后就可以把数据写入我们已经占有的位置: curr_write。

数据的写入过程会和read的过程并发,这也是为什么还要有commit的原因,commit表示这个
位置待写入数据或者正在写入数据,所以read和commit相等意味着队列为空或者队列正在写
入马上可以pop的数据。数据写入完后,要更新队列commit,这时可能多个push都要并发的
更新commit,只有队列commit和curr_write相等时,当前push才能更新commit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
                   curr_write_1
v
+---+---+---+---+---+---+---+---+
| | x | x | x | x | 1 | | | core 1 push
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
| write
|
| CAS(&commit, curr_write, (curr_write + 1) % QSIZE)
|
v curr_write_2
v
+---+---+---+---+---+---+---+---+
| | x | x | x | x | 1 | 2 | | core 2 push
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write

commit的值不断的往后更新,后续core更新commit的条件不断满足。

pop中更新read的逻辑和push中更新write的逻辑是一样的。队列空满判断的逻辑是:read和
commit相等时队列为空,write的下一个位置是head时,队列为满,注意在这样的判断下,
队列最大的容量是QSIZE - 1。

一个例子

如下是从Faustino的实现中copy出来的一个基于数组实现的无锁队列的示例,我们给它加上
注释,并一点一点分析其中有问题的地方。为了方便分析,我们把push和pop的分析分开,
需要展开画图分析的,我们在代码后面画示意图分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#define Q_SIZE 100
#define Q_POS(count) ((count) % Q_SIZE)

static bool push(struct queue *q, int data)
{
int curr_write;
int curr_read;

do {
/*
* 注意,这个代码里的所有memory order的属性我们都用的时候最强的,
* __ATOMIC_SEQ_CST表示其前后的访存都不能越过__ATOMIC_SEQ_CST
* 标记的这条指令。在gcc atomic和memory order的定义上,memory
* order的属性的作用访问不只是__atomic开头的函数,还包括所有访存
* 操作。
*/
curr_write = __atomic_load_n(&q->write_pos, __ATOMIC_SEQ_CST);
curr_read = __atomic_load_n(&q->read_pos, __ATOMIC_SEQ_CST);

/*
* 如上一节所述,write实际写入位置永远不会和read相邻,当写入数据
* 时,发现write和read相邻时,就认为队列已满。
*/
if (Q_POS(curr_write + 1) == Q_POS(curr_read)) {
return false;
}
/*
* 注意CAS更新失败的逻辑,如下CAS函数中,在curr_write和write_pos上的值
* 相等时,把后面Q_POS(curr_write + 1)的值改写write_pos。但是,在不相等
* 时,会用write_pos的值改写curr_write的值。
*
* 这里的逻辑是没有问题的,因为如果CAS更新失败,程序会重新执行这个循环,
* 在如上的代码里拿到最新的write_pos。
*
* 但是,下面commit_pos处的CAS就会有问题,我们在那里分析。
*/
/* 注意,这里还存在ABA的问题, 我们在如下write_pos ABA问题中说明 */
} while (!__atomic_compare_exchange_n(&q->write_pos, &curr_write,
Q_POS(curr_write + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));

q->data[curr_write] = data;

/*
* 如果如下的CAS失败,curr_write值被更新为commit_pos的值,这里的逻辑就
* 乱了,具体分析见如下CAS问题,这其实是一个编码错误。
*/
/*
* 注意, commit_pos不会出现类似write_pos ABA的问题,因为commit_pos总是
* 落后或者等于write_pos。
*/
while (!__atomic_compare_exchange_n(&q->commit_pos, &curr_write,
Q_POS(curr_write + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));

return true;
}

CAS问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
            +---+---+---+---+---+---+---+---+
thread1: | | x | x | x | x | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
curr_write

+---+---+---+---+---+---+---+---+
thread2: | | x | x | x | x | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
curr_write

系统在一个时刻可能处于如上的状态,如果thread2的CAS先运行,因为commit_pos和curr_write
不相等CAS执行失败,commit_pos的值改写curr_write,本来curr_write在CAS里是作为expected
data存在的,等到commit_pos被更新到和当前thread的curr_write一样,就可以把commit_pos
的新值写入。这下后面的逻辑就乱了。

修改的方法是,这里每次做CAS之前把curr_write的值先保存起来,如果CAS失败,就用保存
起来的值对curr_write做恢复。

write_pos ABA问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
thread1 睡前:

+---+---+---+---+---+---+---+---+
| | a | b | c | d | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write
thread1 醒来:
+---+---+---+---+---+---+---+---+
| k | e | f | g | h | | i | g |
+---+---+---+---+---+---+---+---+
^ ^
commit read
^
write

如上,thread1 push在执行write_pos的CAS前sleep了,这期间其它线程持续做pop/push,
队列的状态被更新为如上thread1醒来时的状态,thread1醒来继续执行write_pos的CAS,
因为write_pos和curr_write相等,CAS成功,但是,实际上write和read相邻时,队列已经
处于满的状态,这时候再写入,逻辑已经错了。

解决办法是给write_pos加上版本号,每一轮版本号增加1,CAS操作时也要判断版本号,如果
版本号不一样,CAS失败,重新进行push循环,这里面就会识别到队列已经处于满的状态。

我们看下错误执行下,队列的表现。

1
2
3
4
5
6
7
8
9
10
thread1 醒来:
+---+---+---+---+---+---+---+---+
| k | e | f | g | h | l | i | g |
+---+---+---+---+---+---+---+---+
^
commit
^
write
^
read

如上,thread1错误写入l后,队列的各个指针会相等,这个是合法状态,后续任意thread
认为队列为空,pop操作失败,push操作会覆盖之前队列里的合法内容。综合表现为push入
队的部分数据永远丢失了。

可以看到当thread1醒来,队列不是处于非法状态时,thread1的push会恰巧成功:

1
2
3
4
5
6
7
8
thread1 醒来:
+---+---+---+---+---+---+---+---+
| k | e | f | g | h | | | g |
+---+---+---+---+---+---+---+---+
^ ^
commit read
^
write

加入version的解决办法,这里会多一次重新循环。

在version解决办法的具体实现中,队列的位置还是用write_pos % Q_SIZE,但是我们在
write_pos需要回绕到0的时候还是继续给write_pos加1,这样write_pos % Q_SIZE为0,但
是write_pos / Q_SIZE加了1,表示新版本。可以看到如果睡的时间足够久,版本号也可能
是一样的,如果write_pos是一个64bit的无符号数,可以认为版本号相等的情况永远不会
发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* return false if q is empty */
static bool pop(struct queue *q, int *data)
{
int curr_commit;
int curr_read;
int tmp;

do {
curr_read = __atomic_load_n(&q->read_pos, __ATOMIC_SEQ_CST);
curr_commit = __atomic_load_n(&q->commit_pos, __ATOMIC_SEQ_CST);

if (Q_POS(curr_read) == Q_POS(curr_commit)) {
return false;
}

tmp = q->data[curr_read];
/*
* 这里没有如上commit_pos CAS失败的问题,原因和write_pos一样:
* 当如下CAS失败时,会重新load read_pos的值,更新curr_read。
*/
/* 注意,这里存在ABA的问题, 我们在如下read_pos ABA问题中说明 */
if (__atomic_compare_exchange_n(&q->read_pos, &curr_read,
Q_POS(curr_read + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
*data = tmp;
return true;
}

} while (1);
}

read_pos ABA问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
thread1 睡前:

+---+---+---+---+---+---+---+---+
| | a | b | c | d | | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write
thread1 醒来:
+---+---+---+---+---+---+---+---+
| | e | f | g | h | i | | |
+---+---+---+---+---+---+---+---+
^ ^
read commit
^
write

如上,thread1在pop数据的时候,执行完“tmp = q->data[curr_read]”(tmp为a)后sleep了
很久,这期间其它线程不断的push/pop数据,队列中的数据更新了一轮,当thread1醒来,
正好read_pos和它睡前一样时,thread1后续的CAS会成功,实际上thread1已经读到了错误
的值(a)。

和如上write_pos ABA问题一样,解决办法是在read_pos上加一个版本号,thread1醒来做
CAS的时候,版本号不一样,CAS就会失败,重新进行pop循环,也就会重新读队列里的数据。

继续看下错误执行时的现象,错误执行的时候,thread1醒来后把a pop出来,而a在thread1
sleep时已经被其它thread pop出来过。总体上看,就是pop了重复数据出来。

除了上面的问题,程序里还有因为C语言中为定义行为而带来的问题。大概有如下两个问题:
push/pop读写队列数据未定义行为的问题;队列版本号的问题。我们一个一个看下。

push/pop读写队列数据未定义行为的问题。push/pop中对队列数据的访问从C语言的角度看
并不一定是一个原子行为,当这两个行为同时发生时,逻辑上看程序行为是为定义的。其实,
编译器把队列数据访问编译成store/load的指令,一般而言,单条store/load指令是原子的,
这里是不会出错的,但是从逻辑上看,这里的行为是未定义的。

这个问题的解决办法是使用atomic_load/atomic_store访问队列中的数据,这两个原子操作
可以使用__ATOMIC_RELAXED。改用显示原子操作函数并不会降低程序的性能,因为编译器
编译出来的指令可能就是一样的。

队列版本号的问题。程序中的队列write/read/commit的指针使用的是有符号数,有符号数
溢出的逻辑在C语言里是未定义的。

把write/read/commit改成无符号数虽然没有了未定义的问题,但是程序逻辑本身还是有问
题的(其实这个问题一直都在)。我们把一个队列指针的一部分用作version tag,一部分用作
队列位置描述,需要以二进制bit位置做划分,简单说就是队列长度需要是2的幂次,否则
溢出后新老队列位置标记是对不上的。

综合以上问题修改的一个现在看来没有问题的版本可以参考这里