介绍
Curator在atomic包中提供了一些工具,对在分布式环境中的原子操作提供了支持。
示例
public class CuratorAtomicTest {
private String connectString = "192.168.1.201:2181";
// 会话超时时间,默认60000ms
private int sessionTimeoutMs = 15000;
// 创建连接超时时间,默认15000ms
private int connectionTimeoutMs = 15000;
private static CuratorFramework client;
@Before
public void init() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
// 启动会话
client.start();
System.out.println("连接创建成功");
}
@After
public void close() {
client.close();
System.out.println("连接关闭成功");
}
@Test
public void atomicTest() throws Exception {
String path = "/o";
int num = 10;
CountDownLatch latch = new CountDownLatch(num);
for (int i = 0; i < num; i++) {
new Thread(() -> {
try {
// counterPath不存在时会自动创建
// retryPolicy:重试策略
DistributedAtomicInteger atomicInteger =
new DistributedAtomicInteger(client, path, new RetryNTimes(num, 1000));
AtomicValue<Integer> value = atomicInteger.increment();
System.out.println("-------------------");
if (value.succeeded()) {
System.out.println("更新成功");
System.out.println("旧值:" + value.preValue());
System.out.println("新值:" + value.postValue());
} else {
System.out.println("更新失败");
}
latch.countDown();
} catch (Exception e) {
}
}).start();
}
latch.await();
// 查询节点数据
byte[] b = client.getData().forPath("/o");
System.out.println("累加结果:" + ByteBuffer.wrap(b).getInt());
}
}