Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Zookeeper---Curator---分布式计数器

介绍

Curator在atomic包中提供了一些工具,对在分布式环境中的原子操作提供了支持。
image.png

示例

public class CuratorAtomicTest {
   
    private String connectString = "192.168.1.201:2181";
    // 会话超时时间,默认60000ms
    private int sessionTimeoutMs = 15000;
    // 创建连接超时时间,默认15000ms
    private int connectionTimeoutMs = 15000;
    private static CuratorFramework client;

    @Before
    public void init() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        // 启动会话
        client.start();
        System.out.println("连接创建成功");
    }

    @After
    public void close() {
        client.close();
        System.out.println("连接关闭成功");
    }

    @Test
    public void atomicTest() throws Exception {
        String path = "/o";
        int num = 10;
        CountDownLatch latch = new CountDownLatch(num);
        for (int i = 0; i < num; i++) {
            new Thread(() -> {
                try {
                    // counterPath不存在时会自动创建
                    // retryPolicy:重试策略
                    DistributedAtomicInteger atomicInteger =
                            new DistributedAtomicInteger(client, path, new RetryNTimes(num, 1000));
                    AtomicValue<Integer> value = atomicInteger.increment();
                    System.out.println("-------------------");
                    if (value.succeeded()) {
                        System.out.println("更新成功");
                        System.out.println("旧值:" + value.preValue());
                        System.out.println("新值:" + value.postValue());
                    } else {
                        System.out.println("更新失败");
                    }
                    latch.countDown();
                } catch (Exception e) {
                }
            }).start();
        }
        latch.await();
        // 查询节点数据
        byte[] b = client.getData().forPath("/o");
        System.out.println("累加结果:" + ByteBuffer.wrap(b).getInt());
    }

}