摘要: Java Disruptor是一个高性能的异步处理消息框架,其LMAX架构可以获得每秒6百万订单,用1微秒的延迟获得100K+的吞吐量。
Disruptor 的核心概念
disruptor最大特点是高性能,其针对性能做出极度优化并采用了无锁式的设计。
Ring Buffer
Ring Buffer是一个类似对象池模型的实现,所有的”消息“都保存在里面,解决“消息”对象生存周期短、数量多、回收频繁的问题,在高级的应用场景中,Ring Buffer 可由用户自定义实现来替代。
Sequence Disruptor
通过顺序递增的序号来编号管、定位、处理“消息”。同时用于跟踪标识某个事件处理者( RingBuffer/Consumer )的处理进度。Sequence 同时负责处理CPU缓存伪共享(Flase Sharing)问题。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy
定义 “消费者”如何进行等待下一个事件的策略。
Event
“生产者”和“消费者”之间传递的“消息”
EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler
处理“消息”内容的实现。
Producer
生产者
三、disruptor为什么这么快
Ring Buffer:
Ring Buffer是一个存储数据的环形数组结构,在这里“消息”被按顺装填,不会销毁,直到达到尾部后会进行覆盖。
首先,环形模式的最大优点在于没有尾指针,只需维护下一个位置的序号,与链表结构相比,数组的性能更高。
其次,数据内部元素的内存地址连续存储,在硬件级别,cpu在加载元素时相邻元素会被预加载
再次,数组的内存是一直存在的(直到程序退出),内部的元素也会一直存在,避免了大量创建的开销以及垃圾回收时间。
False Sharing:
当CPU的缓存行(cache line) 中同时读取了两个相互独立的数据时,当两份数据分别要进行更新时,根据MESI协议会产生RFO(Request For Ownership)请求,双方会竞争当前缓存行的所有权,导致另外一份数据缓存失效,此时就是伪共享(False Sharing)。Disruptor针对这种问题采用了缓存行填充的方式,将数据补充放大,保证单独缓存在一个缓存行中,避免伪共享带来的性能影响。
Lock:
锁的存在会不可避免的性能开销,不合理的设计会导致死锁产生,增加设计成本、降低运行效率。
首先,Dispruptor 采用无锁化设计,在多生产者条件下使用CAS(Compare And Swap/Set)进行操作。这是一个CPU级别指令,虽然它们并非没有代价,但比锁消耗资源少的多。
其次,Sequence会为每一个“消息”由同一个线程产生一个序号,避免出现多个线程之间的竞争修改,没有竞争、不需要锁、甚至不需要CAS
Memory Barriers:
内存屏障能够确保一些特定操作执行的顺序,影响一些数据的可见性(可能是某些指令执行后的结果)。Java内存模型在处理volatile时写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令,Disruptor对volatile字段(cursor)的写操作创建了一个内存屏障,这个屏障将刷新所有缓存里的值(或者至少相应地使得缓存失效),保证数据准确性。
四、disruptor的例子:
step 1:Event
package com.bj58.disruptor.demo;public class Message{private long value;
package com.bj58.disruptor.demo;import com.lmax.disruptor.EventFactory;public class MessageFactory implements EventFactory<Message>{public Message newInstance(){return new Message();
step 2: EventHandler
package com.bj58.disruptor.demo;import com.lmax.disruptor.EventHandler;public class MessageHandler implements EventHandler<Message>{public void onEvent(Message event, long sequence, boolean endOfBatch){
step 3: Producer
package com.bj58.disruptor.demo;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import java.nio.ByteBuffer;public class MessageProducer{private final RingBuffer<Message> ringBuffer;
step 4 : Main
package com.bj58.disruptor.demo;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class Main{public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// The factory for the eventMessageFactory factory = new MessageFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<Message> disruptor = new Disruptor<Message>(factory, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(new MessageHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
五、disruptor 的性能:
官方数据:
Multiple Producer
Disruptor
Single Producer
Disruptor=89,365,504 ops/sec
数据对比:
处理的消息数量:32*1024*1024=33554432
Duration(ms) | LinkedBlockingQueue | Disruptor | 对比 |
1 | 5771 | 4209 | 73% |
2 | 5725 | 3479 | 60.77% |
3 | 5433 | 3228 | 59.41% |
4 | 5297 | 2805 | 52.95% |
5 | 5745 | 2477 | 43.12% |
6 | 6124 | 2428 | 39.65% |
分析:
Ring Buffer
Ring Buffer是一个类似对象池模型的实现,所有的”消息“都保存在里面,解决“消息”对象生存周期短、数量多、回收频繁的问题,在高级的应用场景中,Ring Buffer 可由用户自定义实现来替代。
Sequence Disruptor
通过顺序递增的序号来编号管、定位、处理“消息”。同时用于跟踪标识某个事件处理者( RingBuffer/Consumer )的处理进度。Sequence 同时负责处理CPU缓存伪共享(Flase Sharing)问题。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy
定义 “消费者”如何进行等待下一个事件的策略。
Event
“生产者”和“消费者”之间传递的“消息”
EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler
处理“消息”内容的实现。
Producer
生产者
分析:
当线程数较少时,LinkedblockedQueue和Disruptor性能差距并不明显,随着线程数量提升,Disruptor性能优势逐渐变大。
多说一句,希望或的更多的学习资料的同学可以加qun:6 4 7 6 3 1 0 30 (头条不让打广告 只能这样了)
QPS | |||
Disruptor 的核心概念disruptor最大特点是高性能,其针对性能做出极度优化并采用了无锁式的设计。 | 三、disruptor为什么这么快Ring Buffer:Ring Buffer是一个存储数据的环形数组结构,在这里“消息”被按顺装填,不会销毁,直到达到尾部后会进行覆盖。首先,环形模式的最大优点在于没有尾指针,只需维护下一个位置的序号,与链表结构相比,数组的性能更高。其次,数据内部元素的内存地址连续存储,在硬件级别,cpu在加载元素时相邻元素会被预加载再次,数组的内存是一直存在的(直到程序退出),内部的元素也会一直存在,避免了大量创建的开销以及垃圾回收时间。 | False Sharing:当CPU的缓存行(cache line) 中同时读取了两个相互独立的数据时,当两份数据分别要进行更新时,根据MESI协议会产生RFO(Request For Ownership)请求,双方会竞争当前缓存行的所有权,导致另外一份数据缓存失效,此时就是伪共享(False Sharing)。Disruptor针对这种问题采用了缓存行填充的方式,将数据补充放大,保证单独缓存在一个缓存行中,避免伪共享带来的性能影响。 | Lock:锁的存在会不可避免的性能开销,不合理的设计会导致死锁产生,增加设计成本、降低运行效率。首先,Dispruptor 采用无锁化设计,在多生产者条件下使用CAS(Compare And Swap/Set)进行操作。这是一个CPU级别指令,虽然它们并非没有代价,但比锁消耗资源少的多。其次,Sequence会为每一个“消息”由同一个线程产生一个序号,避免出现多个线程之间的竞争修改,没有竞争、不需要锁、甚至不需要CASMemory Barriers:内存屏障能够确保一些特定操作执行的顺序,影响一些数据的可见性(可能是某些指令执行后的结果)。Java内存模型在处理volatile时写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令,Disruptor对volatile字段(cursor)的写操作创建了一个内存屏障,这个屏障将刷新所有缓存里的值(或者至少相应地使得缓存失效),保证数据准确性。四、disruptor的例子:step 1:Event package com.bj58.disruptor.demo;public class Message{private long value; package com.bj58.disruptor.demo;import com.lmax.disruptor.EventFactory;public class MessageFactory implements EventFactory<Message>{public Message newInstance(){return new Message(); step 2: EventHandler package com.bj58.disruptor.demo;import com.lmax.disruptor.EventHandler;public class MessageHandler implements EventHandler<Message>{public void onEvent(Message event, long sequence, boolean endOfBatch){ step 3: Producer package com.bj58.disruptor.demo;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import java.nio.ByteBuffer;public class MessageProducer{private final RingBuffer<Message> ringBuffer; step 4 : Main package com.bj58.disruptor.demo;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class Main{public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// The factory for the eventMessageFactory factory = new MessageFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<Message> disruptor = new Disruptor<Message>(factory, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(new MessageHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();五、disruptor 的性能:官方数据:Multiple Producer DisruptorSingle Producer Disruptor=89,365,504 ops/sec 数据对比: 处理的消息数量:32*1024*1024=33554432Duration(ms)LinkedBlockingQueueDisruptor对比15771420973%25725347960.77%35433322859.41%45297280552.95%55745247743.12%66124242839.65% 分析:QPSLinkedBlockingQueueDisruptor对比158143187972067137%258610369644850165%3617604110394805168%4633461111962364189%5584063213546400232%6547916913819783252% 分析: 当线程数较少时,LinkedblockedQueue和Disruptor性能差距并不明显,随着线程数量提升,Disruptor性能优势逐渐变大。 |
Disruptor | 对比 | ||
1 | 5814318 | 7972067 | 137% |
2 | 5861036 | 9644850 | 165% |
3 | 6176041 | 10394805 | 168% |
4 | 6334611 | 11962364 | 189% |
5 | 5840632 | 13546400 | 232% |
6 | 5479169 | 13819783 | 252% |