简介
Curator在leader包中提供了一些工具,对在分布式环境中的Leader选举操作提供了支持。
LeaderSelector
简单示例
private String connectString = "192.168.1.201:2181";
// 会话超时时间,默认60000ms
private int sessionTimeoutMs = 15000;
// 创建连接超时时间,默认15000ms
private int connectionTimeoutMs = 15000;
private static CuratorFramework client;
private LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " 成为leader");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "释放leader");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
}
};
@Before
public void init() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
// 启动会话
client.start();
System.out.println("连接创建成功");
}
@After
public void close() {
client.close();
System.out.println("连接关闭成功");
}
@Test
public void leaderSelectTest() throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// Leader选举的根目录
String leaderPath = "/aaa";
// 注册监听器
LeaderSelector selector = new LeaderSelector(client, leaderPath, listener);
// 使放弃Leader身份的机器还有机会参与竞选Leader,如果不设置的话放弃了Leader身份就没有再次竞选的机会
selector.autoRequeue();
selector.start();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(Integer.MAX_VALUE);
}
源码分析
start方法如下:
注册了监听器,然后调用requeue
requeue源码如下:
又调用了internalRequeue
:
1、创建任务,交给线程池异步处理;
2、判断参数autoRequeue
, 如果在start方法之前有写代码:selector.autoRequeue();那么此时autoRequeue.get()
的值就为true;回到1重复执行
doWorkLoop()
会调用doWork()
,doWork核心源码如下:
void doWork() throws Exception
{
hasLeadership = false;
try
{
mutex.acquire();
hasLeadership = true;
try
{
。。。。。。
listener.takeLeadership(client);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw e;
}
catch ( Throwable e )
{
log.error("The leader threw an exception", e);
}
finally
{
clearIsQueued();
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw e;
}
catch ( Exception e )
{
log.error("mutex.acquire() threw an exception", e);
throw e;
}
finally
{
hasLeadership = false;
try
{
mutex.release();
}
catch ( Exception ignore )
{
// ignore errors - this is just a safety
}
}
}
说明:
- 每个
LeaderSelector
对象会持有一个InterProcessMutex
类型的分布式锁;
InterProcessMutex原理和源码分析 - 尝试获取分布式锁;
- 获取锁成功,hasLeadership置为true(获取leader身份),调用takeLeadership()中自定义的逻辑;
- takeLeadership方法结束后,释放分布式锁,hasLeadership置为false(丢失leader身份);