Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Zookeeper---Curator---分布式Leader选举

简介

Curator在leader包中提供了一些工具,对在分布式环境中的Leader选举操作提供了支持。
image.png

LeaderSelector

简单示例

    private String connectString = "192.168.1.201:2181";

    // 会话超时时间,默认60000ms
    private int sessionTimeoutMs = 15000;
    // 创建连接超时时间,默认15000ms
    private int connectionTimeoutMs = 15000;
    private static CuratorFramework client;

    private LeaderSelectorListener listener = new LeaderSelectorListener() {
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            System.out.println(Thread.currentThread().getName() + " 成为leader");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "释放leader");
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState state) {
        }
    };

    @Before
    public void init() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        // 启动会话
        client.start();
        System.out.println("连接创建成功");
    }

    @After
    public void close() {
        client.close();
        System.out.println("连接关闭成功");
    }


    @Test
    public void leaderSelectTest() throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    // Leader选举的根目录
                    String leaderPath = "/aaa";
                    // 注册监听器
                    LeaderSelector selector = new LeaderSelector(client, leaderPath, listener);
                    // 使放弃Leader身份的机器还有机会参与竞选Leader,如果不设置的话放弃了Leader身份就没有再次竞选的机会
                    selector.autoRequeue();
                    selector.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

源码分析

start方法如下:
image.png
注册了监听器,然后调用requeue
requeue源码如下:
image.png
又调用了internalRequeue
image.png
1、创建任务,交给线程池异步处理;
2、判断参数autoRequeue, 如果在start方法之前有写代码:selector.autoRequeue();那么此时autoRequeue.get()的值就为true;回到1重复执行

doWorkLoop()会调用doWork(),doWork核心源码如下:

    void doWork() throws Exception
    {
        hasLeadership = false;
        try
        {
            mutex.acquire();
            hasLeadership = true;
            try
            {
                。。。。。。
                listener.takeLeadership(client);
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
                throw e;
            }
            catch ( Throwable e )
            {
                log.error("The leader threw an exception", e);
            }
            finally
            {
                clearIsQueued();
            }
        }
        catch ( InterruptedException e )
        {
            Thread.currentThread().interrupt();
            throw e;
        }
        catch ( Exception e )
        {
            log.error("mutex.acquire() threw an exception", e);
            throw e;
        }
        finally
        {
            hasLeadership = false;
            try
            {
                mutex.release();
            }
            catch ( Exception ignore )
            {
                // ignore errors - this is just a safety
            }
        }
    }

说明:

  1. 每个LeaderSelector对象会持有一个InterProcessMutex类型的分布式锁;
    InterProcessMutex原理和源码分析
  2. 尝试获取分布式锁;
  3. 获取锁成功,hasLeadership置为true(获取leader身份),调用takeLeadership()中自定义的逻辑;
  4. takeLeadership方法结束后,释放分布式锁,hasLeadership置为false(丢失leader身份);