目录
????????Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。注意,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。
Github:GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。
juc下队列存在的问题
队列 | 描述 |
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 |
PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。
Disruptor通过以下设计来解决队列速度慢的问题:
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费
使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
思考:能覆盖数据是否会导致数据丢失呢?
当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
生产者多线程写入的情况下读数据会复杂很多:
如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。
多个生产者写入的时候:
如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
Disruptor构造器
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,...)
引入依赖
<!-- disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
1.创建Event(消息载体/事件)和EventFactory(事件工厂)
创建?OrderEvent?类,这个类将会被放入环形队列中作为消息内容。创建OrderEventFactory类,用于创建OrderEvent事件
@Data
public class OrderEvent {
private long value;
private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
2. 创建消息(事件)生产者
创建?OrderEventProducer?类,它将作为生产者使用
public class OrderEventProducer {
//事件队列
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long value,String name) {
// 获取事件队列 的下一个槽
long sequence = ringBuffer.next();
try {
//获取消息(事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入消息数据
orderEvent.setValue(value);
orderEvent.setName(name);
} catch (Exception e) {
// TODO 异常处理
e.printStackTrace();
} finally {
System.out.println("生产者发送数据value:"+value+",name:"+name);
//发布事件
ringBuffer.publish(sequence);
}
}
3.创建消费者
创建?OrderEventHandler?类,并实现?EventHandler?,作为消费者。
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("消费者获取数据value:"+ event.getValue()+",name:"+event.getName());
}
4. 测试
public class DisruptorDemo {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE, //单生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
//发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
disruptor.shutdown();
}
如果消费者是多个,只需要在调用?handleEventsWith?方法时将多个消费者传递进去。
- disruptor.handleEventsWith(new OrderEventHandler());
上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会被一个消费者消费,那么需要调用?handleEventsWithWorkerPool?方法。
- disruptor.handleEventsWith(new OrderEventHandler());
注意:消费者要实现WorkHandler接口
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
@Override
public void onEvent(OrderEvent event) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。
public class DisruptorDemo2 {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.MULTI, //多生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
//disruptor.handleEventsWith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
},"producer1").start();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"monkey"+i);
}
},"producer2").start();
//disruptor.shutdown();
}
在实际场景中,我们通常会因为业务逻辑而形成一条消费链。比如一个消息必须由?消费者A -> 消费者B -> 消费者C?的顺序依次进行消费。在配置消费者时,可以通过?.then?方法去实现顺序消费。
disruptor.handleEventsWith(new OrderEventHandler())
.then(new OrderEventHandler())
handleEventsWith?与?handleEventsWithWorkerPool?都是支持?.then?的,它们可以结合使用。比如可以按照?消费者A -> (消费者B 消费者C) -> 消费者D?的消费顺序
disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
.then(new OrderEventHandler());