参考 https://tech.meituan.com/2016/11/18/disruptor.html
前言
作为一个有追求的程序猿,时刻保持着对优秀事物的积极学习心态,并发编程就是一个难啃的“硬骨头”,它难在难以控制,大多会使用“锁”来进行管控。但这与性能又南辕北辙。因此,无锁化设计是提升并发性能的关键。本篇文章介绍一个无锁化的高性能并发队列-Disruptor。
1. 背景
Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。
2. Java内置队列
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
我们就从数组和链表两种数据结构来看。
基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全。
基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类
LinkedBlockingQueue:通过锁的方式来实现线程安全。
ConcurrentLinkedQueue:通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现。(LinkedTransferQueue同理)
3. ArrayBlockingQueue的问题
ArrayBlockingQueue在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题,我们下面来分析一下。
1. 加锁
现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。
Disruptor论文中实验得到的结论:
CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。
即:单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。
在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。
综上可知,加锁的性能是最差的。
关于锁和CAS
保证线程安全一般分成两种方式:锁和原子变量。
锁
采取加锁的方式,默认线程会冲突,访问数据时,先加上锁再访问,访问之后再解锁。通过锁界定一个临界区,同时只有一个线程进入。如上图所示,Thread2访问Entry的时候,加了锁,Thread1就不能再执行访问Entry的代码,从而保证线程安全。
ArrayBlockingQueue 通过加锁的方式实现的offer方法,保证线程安全。
原子变量
原子变量能够保证原子性的操作,意思是某个任务在执行过程中,要么全部成功,要么全部失败回滚,恢复到执行之前的初态,不存在初态和成功之间的中间状态。例如 CAS操作,要么比较并交换成功,要么比较并交换失败。由CPU保证原子性。
通过原子变量可以实现线程安全。执行某个任务的时候,先假定不会有冲突,若不发生冲突,则直接执行成功;当发生冲突的时候,则执行失败,回滚再重新操作,直到不发生冲突。
如图所示,Thread1和Thread2都要把Entry加1。若不加锁,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,然后相加,Entry中的value值为2。这与预期不相符,我们预期的是Entry的值经过两次相加后等于3。
CAS会先把Entry现在的value跟线程当初读出的值相比较,若相同,则赋值;若不相同,则赋值执行失败。一般会通过while/for循环来重新执行,直到赋值成功。
代码示例是 AtomicInteger 的getAndAdd方法。CAS是CPU的一个指令,由CPU保证原子性。
在高度竞争的情况下,锁的性能将超过原子变量的性能,但是更真实的竞争情况下,原子变量的性能将超过锁的性能。同时原子变量不会有死锁等活跃性问题。
2. 伪共享
共享
下图是计算器的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。
当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。
另外,线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据。
缓存行
Cache是由很多个cache line组成的。每个cache line通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量。
CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line。
在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。事实上,你可以非常快速的遍历在连续内存块中分配的任意数据结构。
伪共享
ArrayBlockingQueue有三个成员变量:
takeIndex:需要被取走的元素下标
putIndex:可被元素插入的位置的下标
count:队列中元素的数量
这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。
如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。
这种无法充分使用缓存行特性的现象,称为伪共享。
对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。
在属性前后进行padding
1 | public class FalseSharing implements Runnable{ |
使用了共享机制比没有使用共享机制,速度快。
把代码中ValuePadding都替换为ValueNoPadding后,在比较二者结果。
基本知识点
在jdk1.8中,有专门的注解 @Contended
来避免伪共享,更优雅地解决问题。
CAS:Compare And Swap/Set 顾名思义比较和交换,CPU级别的指令,cpu去更新一个值,但如果跟新过程中值发生了变化,操作就失败,然后重试,直到更新成功!
Disruptor的sequence的自增就是CAS的自旋自增,对应的,ArrayBlockQueue的数组索引index是互斥自增!
4. Disruptor的设计方案
disruptor核心数据结构 ringbuffer
- 队列上下游缓冲容器
- 首尾相连的环形数组
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
环持续向 buffer 中写入数据,这个序号会一直增长,直到绕过整个环
环形数组结构
- 新产生的sequence只覆盖,相对于传统队列不需要频繁GC
1 | private E dequeue() { |
ArrayBloackQueue出队takeIndex索引所在元素设置为NULL,高吞吐量下队列会产生大量GC
CAS维护了一个sequence,无锁自旋增长
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
假设两个生产者都想申请第7号slot, 则它们会同时执行CAS自增,执行成功的人得到该序列号slot=7,另一个则重试继续申请下一个可用的slot=8,之后根据mod/size去环形数组中寻找自己的位置。
消费者处理逻辑类似。
小结
Disruptor通过以下设计来解决队列速度慢的问题:
- 环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
- 元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
- 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能。
demo演示
以下代码基于3.3.4版本的Disruptor包。
1 | /** |
5. 解决冲突-内存屏障
- 关键字volatile:Java内存模型将在写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令
RingBuffer的指针(cursor)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一
生产者对RingBuffer更新序列号,之后会对volatile字段(cursor)的写操作创建了一个内存屏障,这个屏障将刷新所有缓存里的值(缓存失效)
消费者获取RingBuffer序列号,涉及到读冲突的缓存失效,C2在C1之后,C2拿到C1更新过的序列号之后,C2才能获取next序列号。内存屏障保证了他们之前的执行顺序,消费者总能获取最新的序列号
内存屏障作为另一个CPU级的指令,没有锁那样大的开销,volatile意味着你不用加锁,就能让获得性能的提升
6. 浅谈disruptor多线程并发读写过程
读写并发简略图
多个生产者的情况下,会遇到 “多个线程重复写同一个元素” 的问题,解决方法是,每个线程获取不同的一段数组空间进行操作,这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS无脑自增即可判断。
Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:AvailableBuffer。当某个位置写入成功的时候,便把Availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。
消费端部分源码分析:
1 | public long waitFor(final long sequence){ |
读写不存在冲突:消费者读取到序号 x 位置元素都被生产者写入成功,消费者消费这一段区间数据。
读写存在冲突:消费者读取到序号x位置生产者正在写入,则消费者返回该序号x,并执行一段等待策略。
常见的等待策略
- BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒
- BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU
- YieldingWaitStrategy: 自旋 + yield (折中方案)
disruptor写线程源码片段分析
1 | do |
多线程环境下,多个生产者通过do/while循环的条件CAS,来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。