介绍
参考link:
https://tech.meituan.com/2016/11/18/disruptor.html
https://cloud.tencent.com/developer/article/1701690
https://www.cnblogs.com/bolingcavalry/p/15355145.html official doc: https://github.com/LMAX-Exchange/disruptor
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
Github:https://github.com/LMAX-Exchange/disruptor 官方学习网站:http://ifeve.com/disruptor-getting-started/
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
link from: https://juejin.cn/post/7312275586256617522
Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
API使用 主要流程 如何使用 Disruptor ,Disruptor 的 API 十分简单,主要有以下几个步骤
定义事件:事件(Event)就是通过 Disruptor 进行交换的数据类型
定义事件工厂:事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory。Disruptor 通过 EventFactory 在RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个”数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据.
定义事件处理的具体实现:通过实现接口com.lmax.disruptor.EventHandler
定义事件处理的具体实现。
定义用于事件处理的线程池:Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。
指定等待策略:Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现, SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
启动 Disruptor。
发布事件:Disruptor 的事件发布过程是一个两阶段提交的过程
第一步:先从 RingBuffer 获取下一个可以写入的事件的序号
第二步:获取对应的事件对象,将数据写入事件对象
第三部:将事件提交到 RingBuffer事件只有在提交之后才会通知 EventProcessor 进行处理
初始化使用 初始化事件 / * 敏感词的Disruptor的事件 */ @Data @AllArgsConstructor @NoArgsConstructor public class CkSensitiveMessageData { / * 空间Id */ private String bizId; / * 群聊 */ private List<GroupSensitiveMessageAnalyzeEntity> groupSensitiveMessageAnalyzeEntities; / * 单聊 */ private List<SingleSensitiveMessageAnalyzeEntity> singleSensitiveMessageAnalyzeEntities; }
初始化事件工厂 public class CkSensitiveMessageDataEventFactory implements EventFactory <CkSensitiveMessageData> { @Override public CkSensitiveMessageData newInstance () { return new CkSensitiveMessageData (); } }
初始化事件处理器 public class EventHandler implements com .lmax.disruptor.EventHandler<CkSensitiveMessageData>, WorkHandler<CkSensitiveMessageData> { @Override public void onEvent (CkSensitiveMessageData event, long sequence, boolean endOfBatch) throws Exception { } @Override public void onEvent (CkSensitiveMessageData event) throws Exception { } }
初始化缓冲区Disruptor public RingBuffer<CkSensitiveMessageData> ckSensitiveMessageDataDisruptor () { CkSensitiveMessageDataEventFactory factory = new CkSensitiveMessageDataEventFactory (); Disruptor<CkSensitiveMessageData> disruptor = new Disruptor <>( factory, DisruptorConst.DEFAULT_RING_BUFFER_SIZE, Thread.ofVirtual().factory(), ProducerType.SINGLE, new BlockingWaitStrategy () ); disruptor.setDefaultExceptionHandler(new ExceptionHandler <>() { @Override public void handleEventException (Throwable e, long sequence, CkSensitiveMessageData event) { } @Override public void handleOnStartException (Throwable e) { } @Override public void handleOnShutdownException (Throwable e) { } }); disruptor.handleEventsWithWorkerPool( new EventHandler (), new EventHandler (), new EventHandler () ); disruptor.start(); return disruptor.getRingBuffer(); }
优雅关闭 Springboot @PreDestroy public void shutdownDisruptor () { if (disruptor != null ) { try { disruptor.shutdown(); } catch (Exception e) { } } }
非Springboot
halt: 直接销毁
shutdown: 有序关闭,没有完成处理的继续处理,缓冲区彻底处理完之后才会关闭
/ * Calls {@link com.lmax.disruptor.EventProcessor#halt()} on all of the event processors created via this disruptor. */ public void halt () { for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.halt(); } } / * <p>Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors. It is critical that publishing to the ring buffer has stopped * before calling this method, otherwise it may never return.</p> * * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> */ public void shutdown () { try { shutdown(-1 , TimeUnit.MILLISECONDS); } catch (final TimeoutException e) { exceptionHandler.handleOnShutdownException(e); } } / * <p>Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors.</p> * * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> * * @param timeout the amount of time to wait for all events to be processed. <code>-1 </code> will give an infinite timeout * @param timeUnit the unit the timeOut is specified in * @throws TimeoutException if a timeout occurs before shutdown completes. */ public void shutdown (final long timeout, final TimeUnit timeUnit) throws TimeoutException{ final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } } halt(); }
Disruptor初始化配置 缓冲区等待策略
BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU
BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)
DummyWaitStrategy:返回的Sequence值为0,正常环境是用不上的
LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用
TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑
LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠
YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
缓冲区大小设置
缓冲区设置大小不合理导致堆栈溢出: https://zhuanlan.zhihu.com/p/43211683
初始化时,RingBuffer规定了总大小,就是这个环最多可以容纳多少槽。这里Disruptor规定了,RingBuffer大小必须是2的n次方