自增操作:
AtomicValue<Integer> value = atomicInteger.increment();
increment方法源码如下:
/**
* Add 1 to the current value and return the new value information. Remember to always
* check {@link AtomicValue#succeeded()}.
*
* @return value info
* @throws Exception ZooKeeper errors
*/
@Override
public AtomicValue<Integer> increment() throws Exception
{
return worker(1);
}
worker方法源码如下:
private AtomicValue<Integer> worker(final Integer addAmount) throws Exception{
Preconditions.checkNotNull(addAmount, "addAmount cannot be null");
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
/**
* 1、previousValue的数据类型由byte[]转换为int
* 2、previousValue加1并将结果赋予newValue
* 3、newValue的数据类型由int转换为byte[]
*/
int previousValue = (previous != null) ? bytesToValue(previous) : 0;
int newValue = previousValue + addAmount;
return valueToBytes(newValue);
}
};
AtomicValue<byte[]> result = value.trySet(makeValue);
return new AtomicInteger(result);
}
trySet方法源码如下:
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception {
// 返回结果
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
// 尝试获取乐观锁
tryOptimistic(result, makeValue);
// 获取乐观锁失败,则尝试获取悲观锁
if ( !result.succeeded() && (mutex != null))
{
tryWithMutex(result, makeValue);
}
return result;
}
tryOptimistic方法源码如下:
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception{
long startMs = System.currentTimeMillis();
int retryCount = 0;
boolean done = false;
while ( !done )
{
result.stats.incrementOptimisticTries();
// 尝试更新数据
if ( tryOnce(result, makeValue) )
{
// 若更新成功,设置成功标识
result.succeeded = true;
done = true;
}
else
{
// 若更新失败,判断是否能重试(是否超过重试策略中指定的次数)
if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
tryOnce方法主要源码如下:
private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
Stat stat = new Stat();
// 获取当前值(旧值)
boolean createIt = getCurrentValue(result, stat);
boolean success = false;
try
{
// 更新后的值(新值)
byte[] newValue = makeValue.makeFrom(result.preValue);
if ( createIt )
{
// 节点不存在时新建节点
client.create().forPath(path, newValue);
}
else
{
// 节点存在时,根据版本号更新节点值(乐观锁机制)
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
}
result.postValue = Arrays.copyOf(newValue, newValue.length);
success = true;
}
catch ( xxx e )
{
...
}
return success;
}
tryWithMutex方法主要源码如下:
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
// 尝试获取锁
if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
{
try
{
boolean done = false;
while ( !done )
{
result.stats.incrementPromotedTries();
// 根据版本号更新节点的数据(乐观锁机制)
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
}
finally
{
// 释放锁
mutex.release();
}
}
result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
总结
- 尝试获取乐观锁
- 节点不存在时新建节点
- 节点存在时,根据版本号更新节点的数据(乐观锁机制)
- 若更新成功,设置成功标识,并返回成功
- 若更新失败,则重试(会判断已经重试的次数是否超过重试策略中的设定值)
- 若以上获取乐观锁失败,则尝试获取悲观锁
- 若获取悲观锁成功,则根据版本号更新节点的数据(乐观锁机制),最后释放悲观锁
- 若获取悲观锁失败,返回失败