基本逻辑 本文参考学习了很多网上介绍无锁队列的文章,其中讲的好的有陈皓的这篇 , Faustino的这篇 , DPDK里也有实现无锁队列,对应的介绍文章在这里 。
队列可以基于链表或者数组实现,无锁队列也有链表和数组实现的方式,陈皓的文章中对链表 实现的方式有深入的分析,总的来讲链表实现方式最大的问题是引入了ABA的问题,数组的方式 其实也有ABA的问题,Faustino和DPDK里有数组实现的实例。数组实现要一开始就分配全部的队列 内存,和链表实现相比使用内存会多。
我们这里基于数组实现,看下无锁队列的核心实现逻辑,Faustino的文章把基于数组的实现 已经讲的很好,直接看原文效果更好。
1 2 3 4 5 6 7 +---+---+---+---+---+---+---+---+ | | x | x | x | x | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write
如上是基本数据结果的示意图,write表示push写入数据的位置,read表示pop读取数据的位置, push数据进入队列并不是一个原子的过程,而是先原子的抢到队列的位置,随后再把数据copy 到位置上,commit就表示彻底完成copy数据的位置。
多个push抢位置的逻辑是这样的,先原子的把write读到一个临时变量里(curr_write),然后 用CAS的方式尝试把队列write的值改成后续的一个值((curr_write + 1) % QSIZE)的值,判断 的依据是队列在这个短暂的时间间隙没有被其他push写入过数据,也就是队列的write还是之前 读出的curr_write,整个CAS的原子行为用CAS指令表达起来就是:CAS(&write, curr_write, (curr_write + 1) % QSIZE), 通过CAS的返回值得知我们是否抢到了队列位置,写入成功表示抢到了队列位置,队列write 往后移动了一个位置。
1 2 3 4 5 6 7 8 9 curr_write v +---+---+---+---+---+---+---+---+ | | x | x | x | x | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write
成功抢到队列位置后,随后就可以把数据写入我们已经占有的位置: curr_write。
数据的写入过程会和read的过程并发,这也是为什么还要有commit的原因,commit表示这个 位置待写入数据或者正在写入数据,所以read和commit相等意味着队列为空或者队列正在写 入马上可以pop的数据。数据写入完后,要更新队列commit,这时可能多个push都要并发的 更新commit,只有队列commit和curr_write相等时,当前push才能更新commit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 curr_write_1 v +---+---+---+---+---+---+---+---+ | | x | x | x | x | 1 | | | core 1 push +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ | write | | CAS(&commit, curr_write, (curr_write + 1) % QSIZE) | v curr_write_2 v +---+---+---+---+---+---+---+---+ | | x | x | x | x | 1 | 2 | | core 2 push +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write
commit的值不断的往后更新,后续core更新commit的条件不断满足。
pop中更新read的逻辑和push中更新write的逻辑是一样的。队列空满判断的逻辑是:read和 commit相等时队列为空,write的下一个位置是head时,队列为满,注意在这样的判断下, 队列最大的容量是QSIZE - 1。
一个例子 如下是从Faustino的实现中copy出来的一个基于数组实现的无锁队列的示例,我们给它加上 注释,并一点一点分析其中有问题的地方。为了方便分析,我们把push和pop的分析分开, 需要展开画图分析的,我们在代码后面画示意图分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #define Q_SIZE 100 #define Q_POS(count) ((count) % Q_SIZE) static bool push(struct queue *q, int data) { int curr_write; int curr_read; do { /* * 注意,这个代码里的所有memory order的属性我们都用的时候最强的, * __ATOMIC_SEQ_CST表示其前后的访存都不能越过__ATOMIC_SEQ_CST * 标记的这条指令。在gcc atomic和memory order的定义上,memory * order的属性的作用访问不只是__atomic开头的函数,还包括所有访存 * 操作。 */ curr_write = __atomic_load_n(&q->write_pos, __ATOMIC_SEQ_CST); curr_read = __atomic_load_n(&q->read_pos, __ATOMIC_SEQ_CST); /* * 如上一节所述,write实际写入位置永远不会和read相邻,当写入数据 * 时,发现write和read相邻时,就认为队列已满。 */ if (Q_POS(curr_write + 1) == Q_POS(curr_read)) { return false; } /* * 注意CAS更新失败的逻辑,如下CAS函数中,在curr_write和write_pos上的值 * 相等时,把后面Q_POS(curr_write + 1)的值改写write_pos。但是,在不相等 * 时,会用write_pos的值改写curr_write的值。 * * 这里的逻辑是没有问题的,因为如果CAS更新失败,程序会重新执行这个循环, * 在如上的代码里拿到最新的write_pos。 * * 但是,下面commit_pos处的CAS就会有问题,我们在那里分析。 */ /* 注意,这里还存在ABA的问题, 我们在如下write_pos ABA问题中说明 */ } while (!__atomic_compare_exchange_n(&q->write_pos, &curr_write, Q_POS(curr_write + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); q->data[curr_write] = data; /* * 如果如下的CAS失败,curr_write值被更新为commit_pos的值,这里的逻辑就 * 乱了,具体分析见如下CAS问题,这其实是一个编码错误。 */ /* * 注意, commit_pos不会出现类似write_pos ABA的问题,因为commit_pos总是 * 落后或者等于write_pos。 */ while (!__atomic_compare_exchange_n(&q->commit_pos, &curr_write, Q_POS(curr_write + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); return true; }
CAS问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +---+---+---+---+---+---+---+---+ thread1: | | x | x | x | x | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ curr_write +---+---+---+---+---+---+---+---+ thread2: | | x | x | x | x | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ curr_write
系统在一个时刻可能处于如上的状态,如果thread2的CAS先运行,因为commit_pos和curr_write 不相等CAS执行失败,commit_pos的值改写curr_write,本来curr_write在CAS里是作为expected data存在的,等到commit_pos被更新到和当前thread的curr_write一样,就可以把commit_pos 的新值写入。这下后面的逻辑就乱了。
修改的方法是,这里每次做CAS之前把curr_write的值先保存起来,如果CAS失败,就用保存 起来的值对curr_write做恢复。
write_pos ABA问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 thread1 睡前: +---+---+---+---+---+---+---+---+ | | a | b | c | d | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write thread1 醒来: +---+---+---+---+---+---+---+---+ | k | e | f | g | h | | i | g | +---+---+---+---+---+---+---+---+ ^ ^ commit read ^ write
如上,thread1 push在执行write_pos的CAS前sleep了,这期间其它线程持续做pop/push, 队列的状态被更新为如上thread1醒来时的状态,thread1醒来继续执行write_pos的CAS, 因为write_pos和curr_write相等,CAS成功,但是,实际上write和read相邻时,队列已经 处于满的状态,这时候再写入,逻辑已经错了。
解决办法是给write_pos加上版本号,每一轮版本号增加1,CAS操作时也要判断版本号,如果 版本号不一样,CAS失败,重新进行push循环,这里面就会识别到队列已经处于满的状态。
我们看下错误执行下,队列的表现。
1 2 3 4 5 6 7 8 9 10 thread1 醒来: +---+---+---+---+---+---+---+---+ | k | e | f | g | h | l | i | g | +---+---+---+---+---+---+---+---+ ^ commit ^ write ^ read
如上,thread1错误写入l后,队列的各个指针会相等,这个是合法状态,后续任意thread 认为队列为空,pop操作失败,push操作会覆盖之前队列里的合法内容。综合表现为push入 队的部分数据永远丢失了。
可以看到当thread1醒来,队列不是处于非法状态时,thread1的push会恰巧成功:
1 2 3 4 5 6 7 8 thread1 醒来: +---+---+---+---+---+---+---+---+ | k | e | f | g | h | | | g | +---+---+---+---+---+---+---+---+ ^ ^ commit read ^ write
加入version的解决办法,这里会多一次重新循环。
在version解决办法的具体实现中,队列的位置还是用write_pos % Q_SIZE,但是我们在 write_pos需要回绕到0的时候还是继续给write_pos加1,这样write_pos % Q_SIZE为0,但 是write_pos / Q_SIZE加了1,表示新版本。可以看到如果睡的时间足够久,版本号也可能 是一样的,如果write_pos是一个64bit的无符号数,可以认为版本号相等的情况永远不会 发生。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 /* return false if q is empty */ static bool pop(struct queue *q, int *data) { int curr_commit; int curr_read; int tmp; do { curr_read = __atomic_load_n(&q->read_pos, __ATOMIC_SEQ_CST); curr_commit = __atomic_load_n(&q->commit_pos, __ATOMIC_SEQ_CST); if (Q_POS(curr_read) == Q_POS(curr_commit)) { return false; } tmp = q->data[curr_read]; /* * 这里没有如上commit_pos CAS失败的问题,原因和write_pos一样: * 当如下CAS失败时,会重新load read_pos的值,更新curr_read。 */ /* 注意,这里存在ABA的问题, 我们在如下read_pos ABA问题中说明 */ if (__atomic_compare_exchange_n(&q->read_pos, &curr_read, Q_POS(curr_read + 1), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { *data = tmp; return true; } } while (1); }
read_pos ABA问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 thread1 睡前: +---+---+---+---+---+---+---+---+ | | a | b | c | d | | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write thread1 醒来: +---+---+---+---+---+---+---+---+ | | e | f | g | h | i | | | +---+---+---+---+---+---+---+---+ ^ ^ read commit ^ write
如上,thread1在pop数据的时候,执行完“tmp = q->data[curr_read]”(tmp为a)后sleep了 很久,这期间其它线程不断的push/pop数据,队列中的数据更新了一轮,当thread1醒来, 正好read_pos和它睡前一样时,thread1后续的CAS会成功,实际上thread1已经读到了错误 的值(a)。
和如上write_pos ABA问题一样,解决办法是在read_pos上加一个版本号,thread1醒来做 CAS的时候,版本号不一样,CAS就会失败,重新进行pop循环,也就会重新读队列里的数据。
继续看下错误执行时的现象,错误执行的时候,thread1醒来后把a pop出来,而a在thread1 sleep时已经被其它thread pop出来过。总体上看,就是pop了重复数据出来。
除了上面的问题,程序里还有因为C语言中为定义行为而带来的问题。大概有如下两个问题: push/pop读写队列数据未定义行为的问题;队列版本号的问题。我们一个一个看下。
push/pop读写队列数据未定义行为的问题。push/pop中对队列数据的访问从C语言的角度看 并不一定是一个原子行为,当这两个行为同时发生时,逻辑上看程序行为是为定义的。其实, 编译器把队列数据访问编译成store/load的指令,一般而言,单条store/load指令是原子的, 这里是不会出错的,但是从逻辑上看,这里的行为是未定义的。
这个问题的解决办法是使用atomic_load/atomic_store访问队列中的数据,这两个原子操作 可以使用__ATOMIC_RELAXED。改用显示原子操作函数并不会降低程序的性能,因为编译器 编译出来的指令可能就是一样的。
队列版本号的问题。程序中的队列write/read/commit的指针使用的是有符号数,有符号数 溢出的逻辑在C语言里是未定义的。
把write/read/commit改成无符号数虽然没有了未定义的问题,但是程序逻辑本身还是有问 题的(其实这个问题一直都在)。我们把一个队列指针的一部分用作version tag,一部分用作 队列位置描述,需要以二进制bit位置做划分,简单说就是队列长度需要是2的幂次,否则 溢出后新老队列位置标记是对不上的。
综合以上问题修改的一个现在看来没有问题的版本可以参考这里 。