简介
Curator在locks包中提供了一些工具,对在分布式环境中的锁操作提供了支持。
InterProcessMutex
分布式可重入公平独占锁
简单示例
private String connectString = "192.168.1.201:2181";
// 会话超时时间,默认60000ms
private int sessionTimeoutMs = 15000;
// 创建连接超时时间,默认15000ms
private int connectionTimeoutMs = 15000;
private static CuratorFramework client;
@Before
public void init() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
// 启动会话
client.start();
System.out.println("连接创建成功");
}
@After
public void close() {
client.close();
System.out.println("连接关闭成功");
}
@Test
public void lockTest() throws Exception {
String path = "/aaa";
int num = 5;
CountDownLatch latch = new CountDownLatch(num);
final InterProcessMutex lock = new InterProcessMutex(client, path);
for (int i = 0; i < num; i++) {
new Thread(() -> {
try {
// 获取锁
lock.acquire();
System.out.println(Thread.currentThread().getName() + "获取锁");
Thread.sleep(2000);
// 释放锁
lock.release();
System.out.println(Thread.currentThread().getName() + "释放锁");
latch.countDown();
} catch (Exception e) {
}
}).start();
}
latch.await();
}
InterProcessMutex源码
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that
* use the same lock path will achieve an inter-process critical section. Further, this mutex is
* "fair" - each user will get the mutex in the order requested (from ZK's point of view)
*/
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
private final LockInternals internals;
private final String basePath;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
private static final String LOCK_NAME = "lock-";
/**
* @param client client
* @param path the path to lock
*/
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, LOCK_NAME, 1, new StandardLockInternalsDriver());
}
/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
/**
* Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread
* can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call
* to {@link #release()}
*
* @param time time to wait
* @param unit time unit
* @return true if the mutex was acquired, false if not
* @throws Exception ZK errors, connection interruptions
*/
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
/**
* Returns true if the mutex is acquired by a thread in this JVM
*
* @return true/false
*/
@Override
public boolean isAcquiredInThisProcess()
{
return (threadData.size() > 0);
}
/**
* Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
* thread had made multiple calls to acquire, the mutex will still be held when this method returns.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
@Override
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
/**
* Return a sorted list of all current nodes participating in the lock
*
* @return list of nodes
* @throws Exception ZK errors, interruptions, etc.
*/
public Collection<String> getParticipantNodes() throws Exception
{
return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
}
@Override
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
{
makeRevocable(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
{
internals.makeRevocable
(
new RevocationSpec
(
executor,
new Runnable()
{
@Override
public void run()
{
listener.revocationRequested(InterProcessMutex.this);
}
}
)
);
}
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = path;
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
boolean isOwnedByCurrentThread()
{
LockData lockData = threadData.get(Thread.currentThread());
return (lockData != null) && (lockData.lockCount.get() > 0);
}
protected byte[] getLockNodeBytes()
{
return null;
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
源码分析
从注释可以知道这种锁具有以下特点:
- 可重入
- 跨进程
- 是公平的,根据请求的顺序获取锁
获取锁过程
acquire
源码如下:
以上两个方法都是阻塞的,不同的是第二个方法有超时时间。
internalLock
源码如下:
每个InterProcessMutex示例都会维护一个变量threadData,结构如下:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
threadData以线程为key,LockData为value;
LockData
源码如下:
lockCount:线程重入的次数
1、首先查询threadData中有没有存储当前线程
2、若有,重入次数加1,返回true
3、若没有,则调用attemptLock尝试获取锁
attemptLock
源码如下:
先用下面的代码创建一个临时顺序节点:
ourPath=client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
创建的节点类似下面的格式:
然后调用方法internalLockLoop
获取分布式锁
先用getSortedChildren
方法获取所有的子节点列表,按每个节点的最后几位顺序号从小到大排序:
最后得到以下子节点列表:
接着调用getsTheLock
获取锁
这里变量maxLeases
为1
1、先判断当前线程对应的节点是不是列表中的第一个(索引为0)
- 若是则获取锁成功
- 若不是则获取锁失败
2、然后返回到internalLockLoop方法的后半部分,判断1的结果
- 若获取锁成功则设置haveTheLock为true,并返回;
- 若获取锁失败则监听前一个节点;
- 若此时前一个节点不在了,则开始下一次循环获取锁;
- 若此时前一个节点存在;
- 没有设置超时时间则一直阻塞;
- 设置了超时时间则阻塞到超时为止,并将doDelete设置为true;
3、若internalLockLoop方法中发生了异常,doDelete设置为true
4、若doDelete为true,删除当前线程对应的节点
获取锁总结
1、每个线程在zookeeper的指定节点(比如 /aaa)下都会创建一个对应的临时顺序节点;
2、读取节点/aaa下面的子节点列表,并重排序(按请求顺序从小到大)
3、获取锁时,判断当前线程对应的节点是不是列表中的第一个(索引为0)
- 若是则获取锁成功
- 若不是则获取锁失败
4、判断3的结果
-
若获取锁成功则设置haveTheLock为true,并返回;
-
若获取锁失败则监听前一个节点;
-
若此时前一个节点不在了,则开始下一次循环获取锁;
-
若此时前一个节点存在;
- 没有设置超时时间则一直阻塞;
- 设置了超时时间则阻塞到超时为止,并将doDelete设置为true(避免大量节点堆积);
-
5、若发生了异常,doDelete设置为true
6、若doDelete为true,删除当前线程对应的节点
释放锁过程
从threadData中获取当前线程对应的lockData
- 为空,抛异常
- 不为空,重入次数减1
- 若此时重入次数大于0,则不能释放锁,直接返回
- 若此时重入次数等于0,则释放锁(删除线程对应的zk临时节点,清除线程在threadData中的数据)
InterProcessSemaphoreMutex
分布式不可重入非公平独占锁
InterProcessReadWriteLock
分布式读写锁