Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Disruptor

1、背景

APM系统中的collector模块用来接收客户端上报的消息,如何提高collector的吞吐量是我们迫切关注的问题。

我们采用经典的异步消费模型来提升系统的吞吐量,消息上报后先存入队列,然后再进行消费。那么队列的性能就成了collector的瓶颈。

2、选型

Java内置的队列

队列有界性数据结构
ArrayBlockingQueue有界加锁数组
LinkedBlockingQueue有界(2^31-1)加锁链表
ConcurrentLinkedQueue无界无锁链表
LinkedTransferQueue无界无锁链表
PriorityBlockingQueue无界加锁
DelayQueue无界加锁

为了系统的稳定性,我们一般不会考虑使用无界的队列。在技术选型时,我们一般采用ArrayBlockingQueue或LinkedBlockingQueue,但是我们对系统性能要求较高,上述两个队列无法满足我们的需求,因为这两个队列存在性能问题:加锁和伪共享。

最终我们选择了Disruptor队列,Disruptor通过以下设计来解决上述的问题:

  • RingBuffer无锁队列设计并发开销最大的就是争用写访问,我们可以把读和写分开,读使用共享锁,写还是使用独占锁。JDK的阻塞队列包包括并发队列中都存在对写操作的独占访问,这也是它们的多线程瓶颈所在。Disruptor中也存在写访问争用,它通过RingBuffer的next方法减弱争用的激烈程度,同时通过CAS操作,避免了线程切换的开销。生产者生产事件时,可以一次性读取多个事件槽,批量生产和批量发布。
  • 缓存行填充,消除伪共享****缓存是由多个缓存行组成的,同一个缓存行里不同数据存在伪共享问题。Disruptor通过缓存行填充技术,来解决伪共享问题。
  • 预分配缓存对象预分配其实是一个空间换时间的思想,RingBuffer的fill方法,在创建Disruptor时就填充整个RingBuffer,而不是每次生产者生产事件时去创建事件对象,这样可以避免JVM大量创建和回收对象,对GC造成压力。由于RingBuffer底层的存储结构是数组,存储的数据元素内存地址是连续的,基于缓存行的机制,数组中的数据都会被预加载到CPU的L1空间中,就无需到主存中加载下一个元素了,从而提高了数据访问的性能。

3、模式

  • 发布订阅模式:同一个时间被多个消费者进行消费。
  • 点对点模式:同一个事件被单个消费者进行消费。
  • 菱形消费模式:同一个事件被某些消费者消费完后,再被其他消费者消费。

4、组件

  • RingBuffer

    • RingBuffer是一个环形缓冲区。存储可复用的事件信息。每次读写操作都循环利用这个内存环,避免频繁分配和回收内存,减轻GC压力,同时由于RingBuffer可以实现为无锁队列,从而提高系统性能。

    • RingBuffer有一个序号,该序号指向数组中下一个可用的元素。当你不停读/写这个buffer时,这个序号会一直增长,直到序号大于这个环。

    • RingBuffer和常用队列的区别,RingBuffer不删除buffer中的数据,而是更新buffer里面的数据。

  • Sequence

    • Sequence用于追踪生产者和消费者的进度。每个生产者都有自己的Sequence,消费者也有自己的Sequence。

    • Sequence跟AtomicLong类似,支持并发操作包括CAS和顺序写。

    • Sequence通过缓存填充技术,消除伪共享问题。

  • Sequencer

    • 这个是Disruptor的核心。实现了该接口的有两种生产者SingleProducerSequencer和MultiProducerSequencer,这两个生产者都实现了所有的并发算法,能够保证生产者和消费者之间正确的传递数据。

    • 生产者在发布事件之前会调用Sequencer的next方法来申请下一个发布事件的sequence。

  • SequenceBarrier

    • SequenceBarrier是通过Sequencer创建的。SequenceBarrier包含了生产者Sequencer中的序号和依赖的消费者的序号。它包含了判断是否有可用的供消费者处理的事件的逻辑。

    • 消费者获取事件前,都会调用SequenceBarrier的waitFor方法来等待直到请求的序号可用。

  • WaitStrategy

    • 决定了当RingBuffer中没有可用的序列时,一个消费者如何等待。

    • BlockingWaitStrategy:在SequenceBarrier上使用lock和condition来实现消费者等待。当吞吐量和低延时指标没有CPU资源重要时,使用该策略。

    • BusySpinWaitStrategy:用一个循环来实现消费者等待。该策略使用CPU资源来避免可能导致延迟抖动的系统调用。最好当线程可以绑定到特定的CPU内核时使用。

    • LiteBlockingWaitStrategy:BlockingWaitStrategy的变体,当锁无竞争时,尝试消除条件唤醒。实验策略,不建议使用。

    • LiteTimeoutBlockingWaitStrategy:TimeoutBlockingWaitStrategy的变体。

    • PhasedBackoffWaitStrategy:当吞吐量和低延时没有CPU资源重要时使用。spin,yields,最后是自己配置的等待策略。

    • SleepingWaitStrategy:该策略首先使用spins,然后使用Thread.yield(),最后使用LockSupport.parkNanos()来实现消费者等待。该策略在性能和CPU资源之间进行了折中。

    • TimeoutBlockingWaitStrategy:消费者在等待可用序列的时候加上了超时时间。

    • YieldingWaitStrategy:在spin之后使用Thread.yield()方法来让消费者等待。该策略会100%利用CPU,如果有其他资源需要CPU资源,它也是很容易放弃CPU资源的

  • Event

    • 从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,Event完全是由用户定义的。

  • EventProcessor

    • 主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。

  • EventHandler

    • 由用户实现并且代表了Disruptor中一个消费者的接口。

  • Producer

    • 由用户实现,它调用RingBuffer来插入Event,在Disruptor中没有相应的实现代码,由用户实现。

5、源码

5.1、Sequence缓存行填充

// value左填充
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}
// value右填充
class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 *
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 */
public class Sequence extends RhsPadding
{
  	static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;
    ...
}

value变量前后填充7个long值来消除伪共享问题。

什么是伪共享

如上图,现在基本都是多核计算机,各个core中间并行执行。如果core1上的线程要更新x,core2上的线程更新y。但是x,y在统一缓存行,core1和core2种的线程需要竞争缓存行的所有权,如果core1拿到了所有权,系统会将core2中的缓存行失效。当core2拿到所有权后,会执行更新操作,同时将core1的缓存行置位失效。这样core1或core2不能直接从L1获取变量的值了,需要从L3里面获取,大大影响了系统的性能。如果想深入了解,请看一下缓存行一致性协议MESI。

5.2、预分配缓存对象

在创建Disruptor对象时,会创建一个RingBuffer对象,并调用fill方法完成缓存对象预分配。

private void fill(EventFactory<E> eventFactory)
{
    for (int i = 0; i < bufferSize; i++)
    {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

5.3、生产端

5.3.1、申请槽位

生产者在发布事件之前,会调用RingBuffer#next()方法来获取发布事件的槽位。我们看一下具体的源码(这里我们以多生产者场景为例):

最终会调用到MultiProducerSequencer#next()方法

@Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        long current;
        long next;
        do
        {
            // RingBuffer中当前的序列号
            current = cursor.get();
            // 生产者申请的序列号
            next = current + n;
            // 标记申请槽位是否已经绕环一圈
            long wrapPoint = next - bufferSize;
            // 上一轮最小的消费者序列号
            long cachedGatingSequence = gatingSequenceCache.get();
            // @1
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
                // @2
                gatingSequenceCache.set(gatingSequence);
            }
            // @3
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
        return next;
    }

代码@1: 用来判断RingBuffer队列是否已满,如果满了,阻塞1纳秒。当wrapPoint大于上一轮消费者序列的最小值时(也就是RingBuffer满了)或上轮消费队列序列的最小值大于当前队列中的序列号时(也就是当前槽中的事件已经被全部的消费者消费了),需要重新计算消费队列的最小值,如果差值还大于消费队列的最小值,阻塞1纳秒。

代码@2:设置该轮的消费者的最小值。

代码@3:将当前的槽值设置为申请的值。

5.3.2、发布事件

生产端通过调用RingBuffer#publish方法,声明某个槽位可供消费者消费,同时唤醒阻塞的消费者线程。

@Override
public void publish(final long sequence)
{
    // @1
    setAvailable(sequence);
    // @2
    waitStrategy.signalAllWhenBlocking();
}

代码@1:设置可消费的槽位。

代码@2:唤醒等待的消费者线程。

5.4、消费端

消费端其实是启动了一个线程(BatchEventProcessor),在run方法中执行processEvents方法。

private void processEvents()
{
    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true)
    {
        try
        {
            //@1
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null)
            {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            while (nextSequence <= availableSequence)
            {
                // @2
                event = dataProvider.get(nextSequence);
                // @3
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }

            sequence.set(availableSequence);
        }
        catch (final TimeoutException e)
        {
            notifyTimeout(sequence.get());
        }
        catch (final AlertException ex)
        {
            if (running.get() != RUNNING)
            {
                break;
            }
        }
        catch (final Throwable ex)
        {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

代码@1:通过SequenceBarrier获得一个可消费的序列号。

代码@2:如果消费者请求的序列号小于可用的消费者序列号,则获取事件。

代码@3:调用自定义的EventHandler对事件进行消费。

在SequenceBarrier中最终会调用WaitStrategy的waitFor方法获取消费者可用的序列号,这里列举了BlockingWaitStrategy策略的实现。

@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    // @1
    if (cursorSequence.get() < sequence)
    {
        lock.lock();
        try
        {
            while (cursorSequence.get() < sequence)
            {
                barrier.checkAlert();
                processorNotifyCondition.await();
            }
        }
        finally
        {
            lock.unlock();
        }
    }
    // @2
    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        barrier.checkAlert();
        ThreadHints.onSpinWait();
    }

    return availableSequence;
}

代码@1:判断生产者序列号是否小于请求的消费者序列号,如果小于,则说明该序列号还未有生产者发布的事件,阻塞等待。

代码@2:如果依赖消费的序列号小于请求的消费者序列号,则说明依赖消费者消费还未完成,需要等待。

6、最佳实践

  • 自定义的consumer中不要有耗时较长的操作,影响系统性能。
  • 发布订阅模式中,尽量让各consumer处理速度相同。不然一个consumer的处理速度会拖垮这个队列的消费速度。