介绍

参考link:

  1. https://tech.meituan.com/2016/11/18/disruptor.html
  2. https://cloud.tencent.com/developer/article/1701690
  3. https://www.cnblogs.com/bolingcavalry/p/15355145.html
    official doc: https://github.com/LMAX-Exchange/disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Github:https://github.com/LMAX-Exchange/disruptor 官方学习网站:http://ifeve.com/disruptor-getting-started/

Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

link from: https://juejin.cn/post/7312275586256617522

image.png

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

API使用

主要流程

如何使用 Disruptor ,Disruptor 的 API 十分简单,主要有以下几个步骤

  1. 定义事件:事件(Event)就是通过 Disruptor 进行交换的数据类型
  2. 定义事件工厂:事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory。Disruptor 通过 EventFactory 在RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个”数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据.
  3. 定义事件处理的具体实现:通过实现接口com.lmax.disruptor.EventHandler定义事件处理的具体实现。
  4. 定义用于事件处理的线程池:Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。
  5. 指定等待策略:Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现, SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
  6. 启动 Disruptor。
  7. 发布事件:Disruptor 的事件发布过程是一个两阶段提交的过程  
    1. 第一步:先从 RingBuffer 获取下一个可以写入的事件的序号
    2. 第二步:获取对应的事件对象,将数据写入事件对象
    3. 第三部:将事件提交到 RingBuffer事件只有在提交之后才会通知 EventProcessor 进行处理

初始化使用

初始化事件


/
* 敏感词的Disruptor的事件
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CkSensitiveMessageData {

/
* 空间Id
*/
private String bizId;

/
* 群聊
*/
private List<GroupSensitiveMessageAnalyzeEntity> groupSensitiveMessageAnalyzeEntities;

/
* 单聊
*/
private List<SingleSensitiveMessageAnalyzeEntity> singleSensitiveMessageAnalyzeEntities;

}

初始化事件工厂

public class CkSensitiveMessageDataEventFactory implements EventFactory<CkSensitiveMessageData> {

@Override
public CkSensitiveMessageData newInstance() {
return new CkSensitiveMessageData();
}

}

初始化事件处理器


public class EventHandler implements com.lmax.disruptor.EventHandler<CkSensitiveMessageData>, WorkHandler<CkSensitiveMessageData> {
@Override
public void onEvent(CkSensitiveMessageData event, long sequence, boolean endOfBatch) throws Exception {

}

@Override
public void onEvent(CkSensitiveMessageData event) throws Exception {

}
}

初始化缓冲区Disruptor

public RingBuffer<CkSensitiveMessageData> ckSensitiveMessageDataDisruptor() {

//指定事件工厂
CkSensitiveMessageDataEventFactory factory = new CkSensitiveMessageDataEventFactory();

Disruptor<CkSensitiveMessageData> disruptor = new Disruptor<>(
factory, DisruptorConst.DEFAULT_RING_BUFFER_SIZE, Thread.ofVirtual().factory(),
ProducerType.SINGLE, new BlockingWaitStrategy()
);

// 注册全局异常处理器
disruptor.setDefaultExceptionHandler(new ExceptionHandler<>() {

@Override
public void handleEventException(Throwable e, long sequence, CkSensitiveMessageData event) {
// do something
}

@Override
public void handleOnStartException(Throwable e) {
// do something
}

@Override
public void handleOnShutdownException(Throwable e) {
// do something
}
});

//设置事件业务处理器---消费者
disruptor.handleEventsWithWorkerPool(
new EventHandler(),
new EventHandler(),
new EventHandler()
);

// 启动disruptor线程
disruptor.start();

//获取ringbuffer环,用于接取生产者生产的事件
return disruptor.getRingBuffer();

}

优雅关闭

Springboot

@PreDestroy
public void shutdownDisruptor() {
// 关闭敏感词ck队列
if (disruptor != null) {
try {
disruptor.shutdown(); // 优雅关闭
} catch (Exception e) {
// do something
}
}
}

非Springboot

  • halt: 直接销毁
  • shutdown: 有序关闭,没有完成处理的继续处理,缓冲区彻底处理完之后才会关闭

/
* Calls {@link com.lmax.disruptor.EventProcessor#halt()} on all of the event processors created via this disruptor.
*/
public void halt()
{
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.halt();
}
}

/
* <p>Waits until all events currently in the disruptor have been processed by all event processors
* and then halts the processors. It is critical that publishing to the ring buffer has stopped
* before calling this method, otherwise it may never return.</p>
*
* <p>This method will not shutdown the executor, nor will it await the final termination of the
* processor threads.</p>
*/
public void shutdown()
{
try
{
shutdown(-1, TimeUnit.MILLISECONDS);
}
catch (final TimeoutException e)
{
exceptionHandler.handleOnShutdownException(e);
}
}

/
* <p>Waits until all events currently in the disruptor have been processed by all event processors
* and then halts the processors.</p>
*
* <p>This method will not shutdown the executor, nor will it await the final termination of the
* processor threads.</p>
*
* @param timeout the amount of time to wait for all events to be processed. <code>-1</code> will give an infinite timeout
* @param timeUnit the unit the timeOut is specified in
* @throws TimeoutException if a timeout occurs before shutdown completes.
*/
public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
{
final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
while (hasBacklog())
{
if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
{
throw TimeoutException.INSTANCE;
}
// Busy spin
}
halt();
}

Disruptor初始化配置

缓冲区等待策略

  1. BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPU
  2. BusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)
  3. DummyWaitStrategy:返回的Sequence值为0,正常环境是用不上的
  4. LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用
  5. TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑
  6. LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  7. SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠
  8. YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  9. PhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

缓冲区大小设置

缓冲区设置大小不合理导致堆栈溢出: https://zhuanlan.zhihu.com/p/43211683

初始化时,RingBuffer规定了总大小,就是这个环最多可以容纳多少槽。这里Disruptor规定了,RingBuffer大小必须是2的n次方