Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Redis---Redisson---分布式锁

简单示例

    private RedissonClient client;

    @Before
    public void init() {
        Config config = new Config();
        // useSingleServer:单机模式
        // useMasterSlaveServers 主从模式
        // useSentinelServers:哨兵模式
        // useCustomServers:集群模式
        config.useSingleServer().setAddress("redis://192.168.1.201:6379");
        client = Redisson.create(config);
    }

    @After
    public void close() {
        client.shutdown();
    }

    /**
     * 加锁
     */
    public boolean lock(String lockName) {
        RLock lock = client.getLock(lockName);
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "成功获取锁");
        return true;
    }

    /**
     * 解锁
     */
    public void unlock(String lockName) {
        RLock lock = client.getLock(lockName);
        lock.unlock();
        System.out.println(Thread.currentThread().getName() + "成功释放锁");
    }

    @Test
    public void redisLockTest() throws Exception {
        String lockName = "redis-lock-test";
        // 获取锁
        lock(lockName);
        // 业务处理
        Thread.sleep(2000);
        // 释放锁
        unlock(lockName);
    }

代码测试

测试代码1

    @Test
    public void lockTest1() throws InterruptedException {
        lock("lock-test");
        Thread.sleep(Integer.MAX_VALUE);
    }
  1. 运行
  2. 查询redis:
    image.png
    可以看到多了一个键:"lock-test"
  3. 查询他的类型:
    image.png
    可以看到是hash类型
  4. 查询他的内容:
    image.png
  5. 查询过期时间,多次查询的结果:
    image.png
    可以看到他的过期剩余时间并不是一直减少的,应该是有一种续约机制(后面会有分析)

测试代码2

    @Test
    public void lockTest2() throws InterruptedException {
        lock("lock-test");
        lock("lock-test");
        lock("lock-test");
        Thread.sleep(Integer.MAX_VALUE);
    }
  1. 运行,输出结果:
    image.png
  2. 查询锁的内容:
    image.png
    可以看到锁对同一线程是可重入的,每重入一次,redis对应的数据的值加1

测试代码3

    @Test
    public void lockTest3() throws Exception {
        int num = 5;
        CountDownLatch latch = new CountDownLatch(num);
        for (int i = 0; i < num; i++) {
            new Thread(() -> {
                String lockName = "lock-test";
                try {
                    // 获取锁
                    lock(lockName);
                    // 业务处理
                    Thread.sleep(2000);
                    // 释放锁
                    unlock(lockName);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        latch.await();
    }
  1. 运行,输出结果:
    image.png
    可以看到不同的线程不能同一时间获取同一个锁。

源码分析

加锁

RedissonLock类的 lock方法源码如下:

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

leaseTime参数用于设置过期时间。

调用的 lockInterruptibly源码如下:

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }


    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 若能获取锁,返回null;获取不到,返回锁的过期剩余时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            // 获取到锁,直接返回
            return;
        }

      
         // 订阅redis信息;
         // RedissonLockEntry中维护有Semaphore属性,用来控制本地的锁请求的信号量同步
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                // 再次尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);

                // ttl为null,表示获取到锁,退出循环
                if (ttl == null) {
                    break;
                }
                // 从信号量中获取一个许可,当超时时进行下一次循环(可减少请求次数)
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            // 取消订阅
            unsubscribe(future, threadId);
        }
    }

获取锁 tryAcquire源码如下:

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }

tryAcquireAsync源码如下:

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
         //  锁设置有过期时间时
       if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 锁没有设置过期时间时,默认过期时间30000ms
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
         // 注册监听器
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    // 加锁成功后,每隔10s续约一次,防止锁过期
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

从以上代码可以看到,如果没有给锁设置过期时间,则使用默认值30000ms,并且多了个调度任务 scheduleExpirationRenewal,每隔10s续约一次,防止锁过期;

scheduleExpirationRenewal源码如下:

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
        // 创建任务
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

tryLockInnerAsync源码如下:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,

                    判断lock-test是否存在;
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                       若不存在,表示可以获取锁,执行下面语句;
                       新建hash类型数据【lock-test-->(唯一标识,1)】
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                       为锁加过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        返回null
                      "return nil; " +
                  "end; " +
                    若lock-test已存在,表明已经有线程获取到了锁,执行下面语句;
                    判断锁是不是被当前线程持有;
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       若是的话,则将计数加1
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                       为锁加过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                    若锁不是被当前线程持有,则返回过期剩余时间
                  "return redis.call('pttl', KEYS[1]);",
                  

        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

参数说明:
image.png

KEYS[1]:锁的名字【lock-test】
ARGV[1]:锁的过期时间【30000】
ARGV[2]:锁的唯一标识(uuid : threadId

解锁

unlock源码如下:

    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
    }

unlockAsync源码如下:

    public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
         // 异步执行
        RFuture<Boolean> future = unlockInnerAsync(threadId);
         // 注册监听器
        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }
                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

unlockInnerAsync源码如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

                 判断lock-test是否存在;
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                     若不存在,则表示锁已经被释放了,向指定的频道发布消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                      返回 1
                    "return 1; " +
                "end;" +
                 若lock-test已存在,表明锁还未被释放,执行下面语句;
                 判断锁是否被当前线程持有;
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                      若不是,返回 nil
                    "return nil;" +
                "end; " +
                 若是,则将计数(重入数)减1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                 判断计数是否大于0
                "if (counter > 0) then " +
                     若大于0,则锁不能被释放,重置过期时间
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                     返回 0
                    "return 0; " +
                "else " +
                     否则,删除key,也就是释放锁;
                    "redis.call('del', KEYS[1]); " +
                     发布消息;
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                     返回 1
                    "return 1; "+
                "end; " +
                "return nil;",
            
  
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    }

参数说明:
image.png

KEYS[1]:锁的名字【lock-test】
KEYS[2]:订阅的频道【redisson_lock__channel:{lock-test}】
ARGV[1]:要发布的消息(0:解锁)
ARGV[2]:锁的过期时间【30000】
ARGV[3]:锁的唯一标识(uuid : threadId

锁被成功释放后,会向指定的频道发布消息,然后等待获取锁的其他客户端将会接收到这个信息并处理,处理代码在 LockPubSub 中,LockPubSub 部分源码:

    public static final Long unlockMessage = 0L;
    public static final Long readUnlockMessage = 1L;

    protected void onMessage(RedissonLockEntry value, Long message) {
        // 若是成功解锁的话,消息体的内容为 0
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

             // 信号量释放一个许可,等待锁的线程被唤醒,再次尝试获取锁
            value.getLatch().release();
        } else if (message.equals(readUnlockMessage)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}