简单示例
private RedissonClient client;
@Before
public void init() {
Config config = new Config();
// useSingleServer:单机模式
// useMasterSlaveServers 主从模式
// useSentinelServers:哨兵模式
// useCustomServers:集群模式
config.useSingleServer().setAddress("redis://192.168.1.201:6379");
client = Redisson.create(config);
}
@After
public void close() {
client.shutdown();
}
/**
* 加锁
*/
public boolean lock(String lockName) {
RLock lock = client.getLock(lockName);
lock.lock();
System.out.println(Thread.currentThread().getName() + "成功获取锁");
return true;
}
/**
* 解锁
*/
public void unlock(String lockName) {
RLock lock = client.getLock(lockName);
lock.unlock();
System.out.println(Thread.currentThread().getName() + "成功释放锁");
}
@Test
public void redisLockTest() throws Exception {
String lockName = "redis-lock-test";
// 获取锁
lock(lockName);
// 业务处理
Thread.sleep(2000);
// 释放锁
unlock(lockName);
}
代码测试
测试代码1
@Test
public void lockTest1() throws InterruptedException {
lock("lock-test");
Thread.sleep(Integer.MAX_VALUE);
}
- 运行
- 查询redis:
可以看到多了一个键:"lock-test" - 查询他的类型:
可以看到是hash类型 - 查询他的内容:
- 查询过期时间,多次查询的结果:
可以看到他的过期剩余时间并不是一直减少的,应该是有一种续约机制(后面会有分析)
测试代码2
@Test
public void lockTest2() throws InterruptedException {
lock("lock-test");
lock("lock-test");
lock("lock-test");
Thread.sleep(Integer.MAX_VALUE);
}
- 运行,输出结果:
- 查询锁的内容:
可以看到锁对同一线程是可重入的,每重入一次,redis对应的数据的值加1
测试代码3
@Test
public void lockTest3() throws Exception {
int num = 5;
CountDownLatch latch = new CountDownLatch(num);
for (int i = 0; i < num; i++) {
new Thread(() -> {
String lockName = "lock-test";
try {
// 获取锁
lock(lockName);
// 业务处理
Thread.sleep(2000);
// 释放锁
unlock(lockName);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
latch.await();
}
- 运行,输出结果:
可以看到不同的线程不能同一时间获取同一个锁。
源码分析
加锁
RedissonLock
类的 lock
方法源码如下:
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
leaseTime参数用于设置过期时间。
调用的 lockInterruptibly
源码如下:
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 若能获取锁,返回null;获取不到,返回锁的过期剩余时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 获取到锁,直接返回
return;
}
// 订阅redis信息;
// RedissonLockEntry中维护有Semaphore属性,用来控制本地的锁请求的信号量同步
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// ttl为null,表示获取到锁,退出循环
if (ttl == null) {
break;
}
// 从信号量中获取一个许可,当超时时进行下一次循环(可减少请求次数)
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消订阅
unsubscribe(future, threadId);
}
}
获取锁 tryAcquire
源码如下:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
tryAcquireAsync
源码如下:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 锁设置有过期时间时
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 锁没有设置过期时间时,默认过期时间30000ms
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 注册监听器
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 加锁成功后,每隔10s续约一次,防止锁过期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
从以上代码可以看到,如果没有给锁设置过期时间,则使用默认值30000ms,并且多了个调度任务 scheduleExpirationRenewal
,每隔10s续约一次,防止锁过期;
scheduleExpirationRenewal
源码如下:
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 创建任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
tryLockInnerAsync
源码如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
判断lock-test是否存在;
"if (redis.call('exists', KEYS[1]) == 0) then " +
若不存在,表示可以获取锁,执行下面语句;
新建hash类型数据【lock-test-->(唯一标识,1)】
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
为锁加过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
返回null
"return nil; " +
"end; " +
若lock-test已存在,表明已经有线程获取到了锁,执行下面语句;
判断锁是不是被当前线程持有;
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
若是的话,则将计数加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
为锁加过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
若锁不是被当前线程持有,则返回过期剩余时间
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
参数说明:
KEYS[1]:锁的名字【lock-test】
ARGV[1]:锁的过期时间【30000】
ARGV[2]:锁的唯一标识(uuid : threadId
)
解锁
unlock
源码如下:
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
unlockAsync
源码如下:
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
// 异步执行
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 注册监听器
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
unlockInnerAsync
源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
判断lock-test是否存在;
"if (redis.call('exists', KEYS[1]) == 0) then " +
若不存在,则表示锁已经被释放了,向指定的频道发布消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
返回 1
"return 1; " +
"end;" +
若lock-test已存在,表明锁还未被释放,执行下面语句;
判断锁是否被当前线程持有;
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
若不是,返回 nil
"return nil;" +
"end; " +
若是,则将计数(重入数)减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
判断计数是否大于0
"if (counter > 0) then " +
若大于0,则锁不能被释放,重置过期时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
返回 0
"return 0; " +
"else " +
否则,删除key,也就是释放锁;
"redis.call('del', KEYS[1]); " +
发布消息;
"redis.call('publish', KEYS[2], ARGV[1]); " +
返回 1
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
参数说明:
KEYS[1]:锁的名字【lock-test】
KEYS[2]:订阅的频道【redisson_lock__channel:{lock-test}】
ARGV[1]:要发布的消息(0:解锁)
ARGV[2]:锁的过期时间【30000】
ARGV[3]:锁的唯一标识(uuid : threadId
)
锁被成功释放后,会向指定的频道发布消息,然后等待获取锁的其他客户端将会接收到这个信息并处理,处理代码在 LockPubSub
中,LockPubSub
部分源码:
public static final Long unlockMessage = 0L;
public static final Long readUnlockMessage = 1L;
protected void onMessage(RedissonLockEntry value, Long message) {
// 若是成功解锁的话,消息体的内容为 0
if (message.equals(unlockMessage)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 信号量释放一个许可,等待锁的线程被唤醒,再次尝试获取锁
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}