Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Zookeeper---Curator---分布式锁源码分析

简介

Curator在locks包中提供了一些工具,对在分布式环境中的锁操作提供了支持。

image.png

InterProcessMutex

分布式可重入公平独占锁

简单示例

    private String connectString = "192.168.1.201:2181";
    // 会话超时时间,默认60000ms
    private int sessionTimeoutMs = 15000;
    // 创建连接超时时间,默认15000ms
    private int connectionTimeoutMs = 15000;
    private static CuratorFramework client;

    @Before
    public void init() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        // 启动会话
        client.start();
        System.out.println("连接创建成功");
    }

    @After
    public void close() {
        client.close();
        System.out.println("连接关闭成功");
    }

    @Test
    public void lockTest() throws Exception {
        String path = "/aaa";
        int num = 5;
        CountDownLatch latch = new CountDownLatch(num);
        final InterProcessMutex lock = new InterProcessMutex(client, path);
        for (int i = 0; i < num; i++) {
            new Thread(() -> {
                try {
                    // 获取锁
                    lock.acquire();
                    System.out.println(Thread.currentThread().getName() + "获取锁");
                    Thread.sleep(2000);
                    // 释放锁
                    lock.release();
                    System.out.println(Thread.currentThread().getName() + "释放锁");
                    latch.countDown();
                } catch (Exception e) {
                }
            }).start();
        }
        latch.await();
    }
InterProcessMutex源码
/**

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.curator.framework.recipes.locks;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that
 * use the same lock path will achieve an inter-process critical section. Further, this mutex is
 * "fair" - each user will get the mutex in the order requested (from ZK's point of view)
 */
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
    private final LockInternals         internals;
    private final String                basePath;

    private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();

    private static class LockData
    {
        final Thread        owningThread;
        final String        lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    private static final String LOCK_NAME = "lock-";

    /**
     * @param client client
     * @param path the path to lock
     */
    public InterProcessMutex(CuratorFramework client, String path)
    {
        this(client, path, LOCK_NAME, 1, new StandardLockInternalsDriver());
    }

    /**
     * Acquire the mutex - blocking until it's available. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

    /**
     * Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

    /**
     * Returns true if the mutex is acquired by a thread in this JVM
     *
     * @return true/false
     */
    @Override
    public boolean isAcquiredInThisProcess()
    {
        return (threadData.size() > 0);
    }

    /**
     * Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
     * thread had made multiple calls to acquire, the mutex will still be held when this method returns.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
    @Override
    public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread      currentThread = Thread.currentThread();
        LockData    lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

    /**
     * Return a sorted list of all current nodes participating in the lock
     *
     * @return list of nodes
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Collection<String>   getParticipantNodes() throws Exception
    {
        return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
    }

    @Override
    public void makeRevocable(RevocationListener<InterProcessMutex> listener)
    {
        makeRevocable(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
    {
        internals.makeRevocable
        (
            new RevocationSpec
            (
                executor,
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        listener.revocationRequested(InterProcessMutex.this);
                    }
                }
            )
        );
    }

    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
    {
        basePath = path;
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }

    boolean      isOwnedByCurrentThread()
    {
        LockData    lockData = threadData.get(Thread.currentThread());
        return (lockData != null) && (lockData.lockCount.get() > 0);
    }

    protected byte[]        getLockNodeBytes()
    {
        return null;
    }

    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
}

源码分析

image.png

从注释可以知道这种锁具有以下特点:

  • 可重入
  • 跨进程
  • 是公平的,根据请求的顺序获取锁

获取锁过程

acquire 源码如下:

image.png

以上两个方法都是阻塞的,不同的是第二个方法有超时时间。
internalLock源码如下:

image.png

每个InterProcessMutex示例都会维护一个变量threadData,结构如下:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
threadData以线程为key,LockData为value;
LockData 源码如下:

image.png

lockCount:线程重入的次数
1、首先查询threadData中有没有存储当前线程
2、若有,重入次数加1,返回true
3、若没有,则调用attemptLock尝试获取锁
attemptLock源码如下:

image.png
image.png

先用下面的代码创建一个临时顺序节点:

ourPath=client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

创建的节点类似下面的格式:

image.png

生成规则和源码分析

然后调用方法internalLockLoop获取分布式锁

image.png
image.png

先用getSortedChildren方法获取所有的子节点列表,按每个节点的最后几位顺序号从小到大排序:

image.png
image.png

最后得到以下子节点列表:

image.png

接着调用getsTheLock获取锁

image.png

这里变量maxLeases为1

1、先判断当前线程对应的节点是不是列表中的第一个(索引为0)

  • 若是则获取锁成功
  • 若不是则获取锁失败

2、然后返回到internalLockLoop方法的后半部分,判断1的结果

  • 若获取锁成功则设置haveTheLock为true,并返回;
  • 若获取锁失败则监听前一个节点;
    • 若此时前一个节点不在了,则开始下一次循环获取锁;
    • 若此时前一个节点存在;
      • 没有设置超时时间则一直阻塞;
      • 设置了超时时间则阻塞到超时为止,并将doDelete设置为true;

3、若internalLockLoop方法中发生了异常,doDelete设置为true
4、若doDelete为true,删除当前线程对应的节点

获取锁总结

1、每个线程在zookeeper的指定节点(比如 /aaa)下都会创建一个对应的临时顺序节点;
2、读取节点/aaa下面的子节点列表,并重排序(按请求顺序从小到大)
3、获取锁时,判断当前线程对应的节点是不是列表中的第一个(索引为0)

  • 若是则获取锁成功
  • 若不是则获取锁失败

4、判断3的结果

  • 若获取锁成功则设置haveTheLock为true,并返回;

  • 若获取锁失败则监听前一个节点;

    • 若此时前一个节点不在了,则开始下一次循环获取锁;

    • 若此时前一个节点存在;

      • 没有设置超时时间则一直阻塞;
      • 设置了超时时间则阻塞到超时为止,并将doDelete设置为true(避免大量节点堆积);

5、若发生了异常,doDelete设置为true
6、若doDelete为true,删除当前线程对应的节点

释放锁过程

image.png

从threadData中获取当前线程对应的lockData

  • 为空,抛异常
  • 不为空,重入次数减1
    • 若此时重入次数大于0,则不能释放锁,直接返回
    • 若此时重入次数等于0,则释放锁(删除线程对应的zk临时节点,清除线程在threadData中的数据)

InterProcessSemaphoreMutex

分布式不可重入非公平独占锁

image.png

InterProcessReadWriteLock

分布式读写锁

image.png