1、背景
APM系统中的collector模块用来接收客户端上报的消息,如何提高collector的吞吐量是我们迫切关注的问题。
我们采用经典的异步消费模型来提升系统的吞吐量,消息上报后先存入队列,然后再进行消费。那么队列的性能就成了collector的瓶颈。
2、选型
Java内置的队列
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 加锁 | 数组 |
LinkedBlockingQueue | 有界(2^31-1) | 加锁 | 链表 |
ConcurrentLinkedQueue | 无界 | 无锁 | 链表 |
LinkedTransferQueue | 无界 | 无锁 | 链表 |
PriorityBlockingQueue | 无界 | 加锁 | 堆 |
DelayQueue | 无界 | 加锁 | 堆 |
为了系统的稳定性,我们一般不会考虑使用无界的队列。在技术选型时,我们一般采用ArrayBlockingQueue或LinkedBlockingQueue,但是我们对系统性能要求较高,上述两个队列无法满足我们的需求,因为这两个队列存在性能问题:加锁和伪共享。
最终我们选择了Disruptor队列,Disruptor通过以下设计来解决上述的问题:
- RingBuffer无锁队列设计并发开销最大的就是争用写访问,我们可以把读和写分开,读使用共享锁,写还是使用独占锁。JDK的阻塞队列包包括并发队列中都存在对写操作的独占访问,这也是它们的多线程瓶颈所在。Disruptor中也存在写访问争用,它通过RingBuffer的next方法减弱争用的激烈程度,同时通过CAS操作,避免了线程切换的开销。生产者生产事件时,可以一次性读取多个事件槽,批量生产和批量发布。
- 缓存行填充,消除伪共享****缓存是由多个缓存行组成的,同一个缓存行里不同数据存在伪共享问题。Disruptor通过缓存行填充技术,来解决伪共享问题。
- 预分配缓存对象预分配其实是一个空间换时间的思想,RingBuffer的fill方法,在创建Disruptor时就填充整个RingBuffer,而不是每次生产者生产事件时去创建事件对象,这样可以避免JVM大量创建和回收对象,对GC造成压力。由于RingBuffer底层的存储结构是数组,存储的数据元素内存地址是连续的,基于缓存行的机制,数组中的数据都会被预加载到CPU的L1空间中,就无需到主存中加载下一个元素了,从而提高了数据访问的性能。
3、模式
- 发布订阅模式:同一个时间被多个消费者进行消费。
- 点对点模式:同一个事件被单个消费者进行消费。
- 菱形消费模式:同一个事件被某些消费者消费完后,再被其他消费者消费。
4、组件
-
RingBuffer
-
RingBuffer是一个环形缓冲区。存储可复用的事件信息。每次读写操作都循环利用这个内存环,避免频繁分配和回收内存,减轻GC压力,同时由于RingBuffer可以实现为无锁队列,从而提高系统性能。
-
RingBuffer有一个序号,该序号指向数组中下一个可用的元素。当你不停读/写这个buffer时,这个序号会一直增长,直到序号大于这个环。
-
RingBuffer和常用队列的区别,RingBuffer不删除buffer中的数据,而是更新buffer里面的数据。
-
-
Sequence
-
Sequence用于追踪生产者和消费者的进度。每个生产者都有自己的Sequence,消费者也有自己的Sequence。
-
Sequence跟AtomicLong类似,支持并发操作包括CAS和顺序写。
-
Sequence通过缓存填充技术,消除伪共享问题。
-
-
Sequencer
-
这个是Disruptor的核心。实现了该接口的有两种生产者SingleProducerSequencer和MultiProducerSequencer,这两个生产者都实现了所有的并发算法,能够保证生产者和消费者之间正确的传递数据。
-
生产者在发布事件之前会调用Sequencer的next方法来申请下一个发布事件的sequence。
-
-
SequenceBarrier
-
SequenceBarrier是通过Sequencer创建的。SequenceBarrier包含了生产者Sequencer中的序号和依赖的消费者的序号。它包含了判断是否有可用的供消费者处理的事件的逻辑。
-
消费者获取事件前,都会调用SequenceBarrier的waitFor方法来等待直到请求的序号可用。
-
-
WaitStrategy
-
决定了当RingBuffer中没有可用的序列时,一个消费者如何等待。
-
BlockingWaitStrategy:在SequenceBarrier上使用lock和condition来实现消费者等待。当吞吐量和低延时指标没有CPU资源重要时,使用该策略。
-
BusySpinWaitStrategy:用一个循环来实现消费者等待。该策略使用CPU资源来避免可能导致延迟抖动的系统调用。最好当线程可以绑定到特定的CPU内核时使用。
-
LiteBlockingWaitStrategy:BlockingWaitStrategy的变体,当锁无竞争时,尝试消除条件唤醒。实验策略,不建议使用。
-
LiteTimeoutBlockingWaitStrategy:TimeoutBlockingWaitStrategy的变体。
-
PhasedBackoffWaitStrategy:当吞吐量和低延时没有CPU资源重要时使用。spin,yields,最后是自己配置的等待策略。
-
SleepingWaitStrategy:该策略首先使用spins,然后使用Thread.yield(),最后使用LockSupport.parkNanos()来实现消费者等待。该策略在性能和CPU资源之间进行了折中。
-
TimeoutBlockingWaitStrategy:消费者在等待可用序列的时候加上了超时时间。
-
YieldingWaitStrategy:在spin之后使用Thread.yield()方法来让消费者等待。该策略会100%利用CPU,如果有其他资源需要CPU资源,它也是很容易放弃CPU资源的
-
-
Event
-
从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,Event完全是由用户定义的。
-
-
EventProcessor
-
主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
-
-
EventHandler
-
由用户实现并且代表了Disruptor中一个消费者的接口。
-
-
Producer
-
由用户实现,它调用RingBuffer来插入Event,在Disruptor中没有相应的实现代码,由用户实现。
-
5、源码
5.1、Sequence缓存行填充
// value左填充
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
// value右填充
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
...
}
value变量前后填充7个long值来消除伪共享问题。
什么是伪共享
如上图,现在基本都是多核计算机,各个core中间并行执行。如果core1上的线程要更新x,core2上的线程更新y。但是x,y在统一缓存行,core1和core2种的线程需要竞争缓存行的所有权,如果core1拿到了所有权,系统会将core2中的缓存行失效。当core2拿到所有权后,会执行更新操作,同时将core1的缓存行置位失效。这样core1或core2不能直接从L1获取变量的值了,需要从L3里面获取,大大影响了系统的性能。如果想深入了解,请看一下缓存行一致性协议MESI。
5.2、预分配缓存对象
在创建Disruptor对象时,会创建一个RingBuffer对象,并调用fill方法完成缓存对象预分配。
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
5.3、生产端
5.3.1、申请槽位
生产者在发布事件之前,会调用RingBuffer#next()方法来获取发布事件的槽位。我们看一下具体的源码(这里我们以多生产者场景为例):
最终会调用到MultiProducerSequencer#next()方法
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
// RingBuffer中当前的序列号
current = cursor.get();
// 生产者申请的序列号
next = current + n;
// 标记申请槽位是否已经绕环一圈
long wrapPoint = next - bufferSize;
// 上一轮最小的消费者序列号
long cachedGatingSequence = gatingSequenceCache.get();
// @1
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// @2
gatingSequenceCache.set(gatingSequence);
}
// @3
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
代码@1: 用来判断RingBuffer队列是否已满,如果满了,阻塞1纳秒。当wrapPoint大于上一轮消费者序列的最小值时(也就是RingBuffer满了)或上轮消费队列序列的最小值大于当前队列中的序列号时(也就是当前槽中的事件已经被全部的消费者消费了),需要重新计算消费队列的最小值,如果差值还大于消费队列的最小值,阻塞1纳秒。
代码@2:设置该轮的消费者的最小值。
代码@3:将当前的槽值设置为申请的值。
5.3.2、发布事件
生产端通过调用RingBuffer#publish方法,声明某个槽位可供消费者消费,同时唤醒阻塞的消费者线程。
@Override
public void publish(final long sequence)
{
// @1
setAvailable(sequence);
// @2
waitStrategy.signalAllWhenBlocking();
}
代码@1:设置可消费的槽位。
代码@2:唤醒等待的消费者线程。
5.4、消费端
消费端其实是启动了一个线程(BatchEventProcessor),在run方法中执行processEvents方法。
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
//@1
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
// @2
event = dataProvider.get(nextSequence);
// @3
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
代码@1:通过SequenceBarrier获得一个可消费的序列号。
代码@2:如果消费者请求的序列号小于可用的消费者序列号,则获取事件。
代码@3:调用自定义的EventHandler对事件进行消费。
在SequenceBarrier中最终会调用WaitStrategy的waitFor方法获取消费者可用的序列号,这里列举了BlockingWaitStrategy策略的实现。
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
// @1
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// @2
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
代码@1:判断生产者序列号是否小于请求的消费者序列号,如果小于,则说明该序列号还未有生产者发布的事件,阻塞等待。
代码@2:如果依赖消费的序列号小于请求的消费者序列号,则说明依赖消费者消费还未完成,需要等待。
6、最佳实践
- 自定义的consumer中不要有耗时较长的操作,影响系统性能。
- 发布订阅模式中,尽量让各consumer处理速度相同。不然一个consumer的处理速度会拖垮这个队列的消费速度。