Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Zookeeper---Curator---分布式计数器源码分析

自增操作:

 AtomicValue<Integer> value = atomicInteger.increment();

increment方法源码如下:

    /**

     * Add 1 to the current value and return the new value information. Remember to always
     * check {@link AtomicValue#succeeded()}.
     *
     * @return value info
     * @throws Exception ZooKeeper errors
     */
    @Override
    public AtomicValue<Integer>    increment() throws Exception
    {
        return worker(1);
    }

worker方法源码如下:

    private AtomicValue<Integer>   worker(final Integer addAmount) throws Exception{
        Preconditions.checkNotNull(addAmount, "addAmount cannot be null");
        MakeValue               makeValue = new MakeValue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
              /**
               * 1、previousValue的数据类型由byte[]转换为int
               * 2、previousValue加1并将结果赋予newValue
               * 3、newValue的数据类型由int转换为byte[]
               */         
                int        previousValue = (previous != null) ? bytesToValue(previous) : 0;
                int        newValue = previousValue + addAmount;
                return valueToBytes(newValue);
            }
        };
        AtomicValue<byte[]>     result = value.trySet(makeValue);
        return new AtomicInteger(result);
    }

trySet方法源码如下:

    AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception {
        // 返回结果
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        // 尝试获取乐观锁
        tryOptimistic(result, makeValue);
        // 获取乐观锁失败,则尝试获取悲观锁
        if ( !result.succeeded() && (mutex != null))
 { 
               tryWithMutex(result, makeValue);
        }
        return result;
    }

tryOptimistic方法源码如下:

private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception{
        long            startMs = System.currentTimeMillis();
        int             retryCount = 0;

        boolean         done = false;
        while ( !done )
        {
            result.stats.incrementOptimisticTries();
            // 尝试更新数据
            if ( tryOnce(result, makeValue) )
            {
                //  若更新成功,设置成功标识 
                result.succeeded = true;
                done = true;
            }
 else
 {
                 //  若更新失败,判断是否能重试(是否超过重试策略中指定的次数)
                if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                {
                    done = true;
                }
            }
        }

        result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
    }

tryOnce方法主要源码如下:

 private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
        Stat        stat = new Stat();
        // 获取当前值(旧值)
        boolean     createIt = getCurrentValue(result, stat);
        boolean     success = false;
        try
 {
             // 更新后的值(新值)
            byte[]  newValue = makeValue.makeFrom(result.preValue);
            if ( createIt )
            {
                // 节点不存在时新建节点
                client.create().forPath(path, newValue);
            }
            else
            {
                // 节点存在时,根据版本号更新节点值(乐观锁机制)
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
            }
            result.postValue = Arrays.copyOf(newValue, newValue.length);
            success = true;
        }
        catch ( xxx e )
        {
...
        }
      

        return success;
    }

tryWithMutex方法主要源码如下:

    private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
     {
         long            startMs = System.currentTimeMillis();
         int             retryCount = 0;

          // 尝试获取锁
         if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
         {
             try
             {
                 boolean         done = false;
                 while ( !done )
                 {
                     result.stats.incrementPromotedTries();
                      // 根据版本号更新节点的数据(乐观锁机制)
                     if ( tryOnce(result, makeValue) )
                     {
                         result.succeeded = true;
                         done = true;
                     }
                     else
                     {
                         if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                         {
                             done = true;
                         }
                     }
                 }
             }
             finally
             {
   
                 // 释放锁
                 mutex.release();
             }
         }

         result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
    }

总结

  1. 尝试获取乐观锁
    1. 节点不存在时新建节点
    2. 节点存在时,根据版本号更新节点的数据(乐观锁机制)
      1. 若更新成功,设置成功标识,并返回成功
      2. 若更新失败,则重试(会判断已经重试的次数是否超过重试策略中的设定值)
  2. 若以上获取乐观锁失败,则尝试获取悲观锁
    1. 若获取悲观锁成功,则根据版本号更新节点的数据(乐观锁机制),最后释放悲观锁
    2. 若获取悲观锁失败,返回失败