Watcher Type
operation
| create |
delete
|
child create
|
child delete
|
change
|
exists
|
NodeCreate
|
NodeDelete
|
NodeDataChanged
|
||
getData
|
NodeDelete
|
NodeDataChanged
|
|||
getChildren
|
NodeDelete
|
NodeChildrenChanged
|
NodeChildrenChanged
|
NODE_CREATED : 노드가 생성 됨을 감지
NODE_DELETE : 노드가 삭제 됨을 감지
NODE_DATA_CHANGED : 노드의 데이터가 변경됨을 감지
NODE_CHILDREN_CHANGED : 자식 노드가 변경 됨을 감지
Node 생성 종류
[zookeeper.create(nodepath, nodedata, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode, Callback, Object)]
CreateMode.PERSISTENT 영속성
CreateMode.EPHEMERAL 해당 세션이 살아 있을 경우에만 지속
[Executor]
public class Executor implements Runnable, AsyncCallback.ChildrenCallback {
ZooKeeper zk;
public Executor() throws IOException {
zk = new ZooKeeper("127.0.0.1:2181", 3000, null);
// default watcher 등록
Watcher w = new NodeMonitor(zk, TestNodeManager.getInstacne());
zk.register(w);
try {
/*
1. a node 의 children node 변경 감지 (a node delete, child delete, child create)
- zookeeper connection 생성시 등록한 default watcher or zk.register(w) watcher -
2. zk.getChildren 실행되면
- watcher 가 변경 감지하여 실행하는 것은 아님 변경감지하면 watcher 의 process method 실행됨 -
실행결과(children node List)를 등록한 callback -w callback (processResult)- 으로 후 처리
*/
// zk.getChildren("/a", true, (AsyncCallback.ChildrenCallback) w, null);
// 하위 노드에 watcher 를 모두 등록하려면 현재 class 에서 등록하는게 더 직관적
zk.getChildren("/a", true, this, null);
// 최초 시작시 a node 변경 감지를 위한 설정 - create, delete, data change - NodeDataChanged
zk.exists("/a", true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void processResult(int rc, String path, Object ctx, List children) {
KeeperException.Code code = KeeperException.Code.get(rc);
// 성공
if(code == KeeperException.Code.OK) {
if(children != null) {
// 조회된 child node 모두 watcher (getChildren, exists) 등록
for (String c : children) {
try {
zk.getChildren(path + "/" + c, true, this, null);
zk.exists(path + "/" + c, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws IOException {
Executor exec = new Executor();
exec.run();
}
@Override
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
}
}
}
[Watcher and Callback]
public class NodeMonitor implements Watcher, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {
private Logger logger = LoggerFactory.getLogger(NodeMonitor.class);
private ZooKeeper zk;
private NodeManager manager;
public NodeMonitor(ZooKeeper zk, NodeManager manager) {
logger.debug("-------- Constructor --------");
this.zk = zk;
this.manager = manager;
}
/**
* watcher 는 node 감지에 소비되므로 감지시 재등록 해주어야 한다.
* 특정 node 하위의 모든 event 를 감시하려면 exists 와 getChildren 모두에 watcher 를 등록해야 한다.
* Watcher @Override node 변화 감지, operation 의 watcher 인자에 의해 실행
* @param event
*/
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
logger.debug("event type {}, path {}", event.getType(), path);
String eventType = event.getType().name();
String eventState = event.getState().name();
System.out.println("## process: path : "+path+", eventType : "+ eventType + ", eventState: "+ eventState);
switch (event.getType()) {
case NodeChildrenChanged:
// 하위 노드 조회 메소드 - 하위 노드 조회가 성공하면 this.callback method - processResult - 후 처리
// getChildren watcher 재 등록
zk.getChildren(path, true, this, null);
break;
case NodeDataChanged:
logger.debug("node data path = {} ", path);
// watcher 재등록만을 위한 목적이면 callback = null
// NodeDataChanged 에 대한 후 처리가 필요한 경우 callback class 는 AsyncCallback.StatCallback 를 구현
// zk.exists(path, true, this, null);
try {
zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
// exists callback method 에서 처리 하지 않고 process method 에서 처리 할 수 있음
// 물론 아래 로직을 exists callback 에 기술할 수 있음
try {
byte[] b = zk.getData(path, this, null);
String data = new String(b);
boolean isTestNode = manager.isTestNode(path, data, this::selectNodeData);
// manager.getNode(data);
logger.debug("node data changed {} - is test node {} : ", data, isTestNode);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
break;
}
}
private Node selectNodeData(String path, String data) {
Node node = manager.getNode(data);
node.setPath(path);
return node;
}
/**
* getChildren Callback - operation 실행 후 수행되는 callback method, operation 의 callback 인자에 의해 실행
* 즉 watcher 에 의해 변화를 감지하고 변화에 맞는 operation 을 수행했을때
* operation 의 지정된 callback 에 의해 processResult 가 실행
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List children) {
KeeperException.Code code = KeeperException.Code.get(rc);
logger.debug("CALLBACK - getChildren, path : {}, code : {} ", path, code);
// 성공
if(code == KeeperException.Code.OK) {
if(children != null) {
// do something...
// NodeChildrenChanged event 발생시 등록한 callback 에 의해 수행 될 비즈니스 로직
logger.debug("children {} ", children);
for (String c : children) {
logger.debug("child node {}" + c);
try {
// 하위의 하위가 변경된게 아니기때문에 하위의 getChildren call back 필요없음
zk.getChildren(path + "/" + c, true);
zk.exists(path + "/" + c, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 실패
else {
}
}
/**
* exists callback
* implements AsyncCallback.StatCallback
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
}
※ operation 실행시 watcher parameter 의 true 와 watcher object 의 차이
true - default watcher 등록
watcher object - 인자로 전달되는 object 를 watcher 로 등록 (this 는 현재 object 를 watcher 로 등록)
댓글 없음:
댓글 쓰기