Watcher Type
operation
| create |
delete
|
child create
|
child delete
|
change
|
exists
|
NodeCreate
|
NodeDelete
|
NodeDataChanged
|
||
getData
|
NodeDelete
|
NodeDataChanged
|
|||
getChildren
|
NodeDelete
|
NodeChildrenChanged
|
NodeChildrenChanged
|
NODE_CREATED : 노드가 생성 됨을 감지
NODE_DELETE : 노드가 삭제 됨을 감지
NODE_DATA_CHANGED : 노드의 데이터가 변경됨을 감지
NODE_CHILDREN_CHANGED : 자식 노드가 변경 됨을 감지
Node 생성 종류
[zookeeper.create(nodepath, nodedata, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode, Callback, Object)]
CreateMode.PERSISTENT 영속성
CreateMode.EPHEMERAL 해당 세션이 살아 있을 경우에만 지속
[Executor]
public class Executor implements Runnable, AsyncCallback.ChildrenCallback { ZooKeeper zk; public Executor() throws IOException { zk = new ZooKeeper("127.0.0.1:2181", 3000, null); // default watcher 등록 Watcher w = new NodeMonitor(zk, TestNodeManager.getInstacne()); zk.register(w); try { /* 1. a node 의 children node 변경 감지 (a node delete, child delete, child create) - zookeeper connection 생성시 등록한 default watcher or zk.register(w) watcher - 2. zk.getChildren 실행되면 - watcher 가 변경 감지하여 실행하는 것은 아님 변경감지하면 watcher 의 process method 실행됨 - 실행결과(children node List)를 등록한 callback -w callback (processResult)- 으로 후 처리 */ // zk.getChildren("/a", true, (AsyncCallback.ChildrenCallback) w, null); // 하위 노드에 watcher 를 모두 등록하려면 현재 class 에서 등록하는게 더 직관적 zk.getChildren("/a", true, this, null); // 최초 시작시 a node 변경 감지를 위한 설정 - create, delete, data change - NodeDataChanged zk.exists("/a", true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } @Override public void processResult(int rc, String path, Object ctx, Listchildren) { KeeperException.Code code = KeeperException.Code.get(rc); // 성공 if(code == KeeperException.Code.OK) { if(children != null) { // 조회된 child node 모두 watcher (getChildren, exists) 등록 for (String c : children) { try { zk.getChildren(path + "/" + c, true, this, null); zk.exists(path + "/" + c, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) throws IOException { Executor exec = new Executor(); exec.run(); } @Override public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { } } }
[Watcher and Callback]
public class NodeMonitor implements Watcher, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback { private Logger logger = LoggerFactory.getLogger(NodeMonitor.class); private ZooKeeper zk; private NodeManager manager; public NodeMonitor(ZooKeeper zk, NodeManager manager) { logger.debug("-------- Constructor --------"); this.zk = zk; this.manager = manager; } /** * watcher 는 node 감지에 소비되므로 감지시 재등록 해주어야 한다. * 특정 node 하위의 모든 event 를 감시하려면 exists 와 getChildren 모두에 watcher 를 등록해야 한다. * Watcher @Override node 변화 감지, operation 의 watcher 인자에 의해 실행 * @param event */ @Override public void process(WatchedEvent event) { String path = event.getPath(); logger.debug("event type {}, path {}", event.getType(), path); String eventType = event.getType().name(); String eventState = event.getState().name(); System.out.println("## process: path : "+path+", eventType : "+ eventType + ", eventState: "+ eventState); switch (event.getType()) { case NodeChildrenChanged: // 하위 노드 조회 메소드 - 하위 노드 조회가 성공하면 this.callback method - processResult - 후 처리 // getChildren watcher 재 등록 zk.getChildren(path, true, this, null); break; case NodeDataChanged: logger.debug("node data path = {} ", path); // watcher 재등록만을 위한 목적이면 callback = null // NodeDataChanged 에 대한 후 처리가 필요한 경우 callback class 는 AsyncCallback.StatCallback 를 구현 // zk.exists(path, true, this, null); try { zk.exists(path, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } // exists callback method 에서 처리 하지 않고 process method 에서 처리 할 수 있음 // 물론 아래 로직을 exists callback 에 기술할 수 있음 try { byte[] b = zk.getData(path, this, null); String data = new String(b); boolean isTestNode = manager.isTestNode(path, data, this::selectNodeData); // manager.getNode(data); logger.debug("node data changed {} - is test node {} : ", data, isTestNode); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } break; } } private Node selectNodeData(String path, String data) { Node node = manager.getNode(data); node.setPath(path); return node; } /** * getChildren Callback - operation 실행 후 수행되는 callback method, operation 의 callback 인자에 의해 실행 * 즉 watcher 에 의해 변화를 감지하고 변화에 맞는 operation 을 수행했을때 * operation 의 지정된 callback 에 의해 processResult 가 실행 * @param rc * @param path * @param ctx * @param children */ @Override public void processResult(int rc, String path, Object ctx, Listchildren) { KeeperException.Code code = KeeperException.Code.get(rc); logger.debug("CALLBACK - getChildren, path : {}, code : {} ", path, code); // 성공 if(code == KeeperException.Code.OK) { if(children != null) { // do something... // NodeChildrenChanged event 발생시 등록한 callback 에 의해 수행 될 비즈니스 로직 logger.debug("children {} ", children); for (String c : children) { logger.debug("child node {}" + c); try { // 하위의 하위가 변경된게 아니기때문에 하위의 getChildren call back 필요없음 zk.getChildren(path + "/" + c, true); zk.exists(path + "/" + c, true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } } } // 실패 else { } } /** * exists callback * implements AsyncCallback.StatCallback * @param rc * @param path * @param ctx * @param stat */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } }
※ operation 실행시 watcher parameter 의 true 와 watcher object 의 차이
true - default watcher 등록
watcher object - 인자로 전달되는 object 를 watcher 로 등록 (this 는 현재 object 를 watcher 로 등록)
댓글 없음:
댓글 쓰기