0%

DynamicConfigCenter中Zookeeper相关操作

重新温习一下操作Zookeeper的一些方法。

创建节点、删除节点、修改节点数据、监听事件等等,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
public class CuratorBasicOperation {

private static final String SERVER_ADDR = "127.0.0.1:2181";

public static void main(String[] args) throws Exception {

/**
* 会话超时时间5s
* 连接超时时间5s
* 重试策略:最多重试3次,每次重试间隔1s
*/
CuratorFramework curatorClient = CuratorFrameworkFactory
.builder()
.connectString(SERVER_ADDR)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(new RetryNTimes(3, 1000))
.build();

// 连接状态监听
curatorClient
.getConnectionStateListenable()
.addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("连接状态监听ConnectionState: " + connectionState);
if (connectionState == ConnectionState.RECONNECTED) {
System.out.println("连接状态监听reconnect, redo something");
}
}
});

/**
* 主要针对background通知和错误通知,
* 对于节点的创建或修改则不会触发监听事件
*/
curatorClient
.getCuratorListenable()
.addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("监听CuratorEvent: " + curatorEvent);
if (curatorEvent.getType() == CuratorEventType.WATCHED) {
System.out.println("监听watched, do something");
}
}
});

curatorClient.start();

// 节点事件监听
NodeCache nodeCache = new NodeCache(curatorClient, "/my_root/cxis_2/config");
nodeCache.start();
nodeCache
.getListenable()
.addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String result = new String(nodeCache.getCurrentData().getData());
System.out.println("节点事件监听nodeChanged: " + result);
}
});

// 子节点事件监听
PathChildrenCache childrenCache = new PathChildrenCache(curatorClient, "/my_root", true);
childrenCache.start();
childrenCache
.getListenable()
.addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点事件监听PathChildrenCacheEvent: " + pathChildrenCacheEvent);
}
});

// 创建节点
String result1 = curatorClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/my_root/cxis_1/config", "this is value for cxis_1".getBytes());

System.out.println("创建节点result1: " + result1);

String result2 = curatorClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/my_root/cxis_2/config", "this is value for cxis_2".getBytes());

System.out.println("创建节点result2: " + result2);

String result3 = curatorClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/my_root/cxis_3/config", "this is value for cxis_3".getBytes());

System.out.println("创建节点result3: " + result3);

// 获取子节点
List<String> children = curatorClient.getChildren().forPath("/my_root");
System.out.println("获取子节点children: " + children);

// 获取节点数据
String value3 = new String(curatorClient.getData().forPath("/my_root/cxis_3/config"));
System.out.println("获取节点数据value3: " + value3);

// 获取节点数据和状态
Stat stat = new Stat();
String value3WithStat = new String(curatorClient.getData().storingStatIn(stat).forPath("/my_root/cxis_3/config"));
System.out.println("获取节点数据和状态value3WithStat: " + value3WithStat + ", stat: " + stat);

// 获取节点数据
String value1 = new String(curatorClient.getData().forPath("/my_root/cxis_1/config"));
System.out.println("获取节点数据value1: " + value1);

// 判断结点是否存在
Stat stat1 = curatorClient.checkExists().forPath("/my_root/cxis_1/config");
System.out.println("判断结点是否存在stat1: " + stat1);

// 删除节点和子节点
curatorClient.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/my_root/cxis_1");

// 判断结点是否存在
Stat stat2 = curatorClient.checkExists().forPath("/my_root/cxis_1/config");
System.out.println("判断结点是否存在stat2: " + stat2);

// 修改节点数据
Stat stat3 = curatorClient.setData().forPath("/my_root/cxis_2/config", "this is value for cxis_2_new".getBytes());
System.out.println("修改节点数据stat3: " + stat3);

// 获取节点数据
String value2AfterUpdate = new String(curatorClient.getData().forPath("/my_root/cxis_2/config"));
System.out.println("获取节点数据value2AfterUpdate: " + value2AfterUpdate);

Thread.sleep(1000);
}

}
坚持原创技术分享,您的支持将鼓励我继续创作!
Fork me on GitHub