2017년 2월 5일 일요일

zookeeper node watcher 와 callback


Watcher Type

operation
create
delete
child create
child delete
change
exists
NodeCreate
NodeDelete


NodeDataChanged
getData

NodeDelete


NodeDataChanged
getChildren

NodeDelete
NodeChildrenChanged
NodeChildrenChanged



NODE_CREATED : 노드가 생성 됨을 감지
NODE_DELETE : 노드가 삭제 됨을 감지
NODE_DATA_CHANGED : 노드의 데이터가 변경됨을 감지
NODE_CHILDREN_CHANGED : 자식 노드가 변경 됨을 감지

Node 생성 종류
[zookeeper.create(nodepath, nodedata, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode, Callback, Object)]
CreateMode.PERSISTENT 영속성
CreateMode.EPHEMERAL 해당 세션이 살아 있을 경우에만 지속


[Executor]


public class Executor implements Runnable, AsyncCallback.ChildrenCallback {

    ZooKeeper zk;
    public Executor() throws IOException {
        zk = new ZooKeeper("127.0.0.1:2181", 3000, null);
        // default watcher 등록
        Watcher w = new NodeMonitor(zk, TestNodeManager.getInstacne());
        zk.register(w);

        try {

            /*
            1. a node 의 children node 변경 감지 (a node delete, child delete, child create)
                - zookeeper connection 생성시 등록한 default watcher or zk.register(w)  watcher -
            2. zk.getChildren 실행되면
                - watcher 가 변경 감지하여 실행하는 것은 아님 변경감지하면 watcher 의 process method 실행됨 -
                실행결과(children node List)를 등록한 callback -w callback (processResult)- 으로 후 처리
             */
            // zk.getChildren("/a", true, (AsyncCallback.ChildrenCallback) w, null);
            // 하위 노드에 watcher 를 모두 등록하려면 현재 class 에서 등록하는게 더 직관적
           zk.getChildren("/a", true, this, null);

            // 최초 시작시 a node 변경 감지를 위한 설정 - create, delete, data change - NodeDataChanged
            zk.exists("/a", true);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void processResult(int rc, String path, Object ctx, List children) {
        KeeperException.Code code = KeeperException.Code.get(rc);
        // 성공
        if(code == KeeperException.Code.OK) {
            if(children != null) {
                // 조회된 child node 모두 watcher (getChildren, exists) 등록
                for (String c : children) {
                    try {
                        zk.getChildren(path + "/" + c, true, this, null);
                        zk.exists(path + "/" + c, true);
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }
    }

    public static void main(String[] args) throws IOException {

        Executor exec = new Executor();
        exec.run();

    }

    @Override
    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }

    }

}


[Watcher and Callback]

public class NodeMonitor implements Watcher, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {

    private Logger logger = LoggerFactory.getLogger(NodeMonitor.class);

    private ZooKeeper zk;
    private NodeManager manager;

    public NodeMonitor(ZooKeeper zk, NodeManager manager) {

        logger.debug("-------- Constructor --------");

        this.zk = zk;
        this.manager = manager;
    }

    /**
     * watcher 는 node 감지에 소비되므로 감지시 재등록 해주어야 한다.
     * 특정 node 하위의 모든 event 를 감시하려면 exists 와 getChildren 모두에 watcher 를 등록해야 한다.
     * Watcher @Override node 변화 감지, operation 의 watcher 인자에 의해 실행
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {

        String path = event.getPath();

        logger.debug("event type {}, path {}", event.getType(), path);

        String eventType = event.getType().name();
        String eventState = event.getState().name();
        System.out.println("## process: path : "+path+", eventType : "+ eventType + ", eventState: "+ eventState);



        switch (event.getType()) {
            case NodeChildrenChanged:
                // 하위 노드 조회 메소드 - 하위 노드 조회가 성공하면 this.callback method - processResult - 후 처리
                // getChildren watcher 재 등록
                zk.getChildren(path, true, this, null);

                break;
            case NodeDataChanged:

                logger.debug("node data path = {} ", path);
                // watcher 재등록만을 위한 목적이면 callback = null
                // NodeDataChanged 에 대한 후 처리가 필요한 경우 callback class 는 AsyncCallback.StatCallback 를 구현
                // zk.exists(path, true, this, null);
                try {
                    zk.exists(path, true);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }

                // exists callback method 에서 처리 하지 않고 process method 에서 처리 할 수 있음
                // 물론 아래 로직을 exists callback 에 기술할 수 있음
                try {
                    byte[] b = zk.getData(path, this, null);
                    String data = new String(b);
                    boolean isTestNode = manager.isTestNode(path, data, this::selectNodeData);
                    // manager.getNode(data);

                    logger.debug("node data changed {} - is test node {} : ", data, isTestNode);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }

                break;
        }
    }

    private Node selectNodeData(String path, String data) {

        Node node = manager.getNode(data);
        node.setPath(path);

        return node;
    }


    /**
     * getChildren Callback - operation 실행 후 수행되는 callback method, operation 의 callback 인자에 의해 실행
     * 즉 watcher 에 의해 변화를 감지하고 변화에 맞는 operation 을 수행했을때
     * operation 의 지정된 callback 에 의해 processResult 가 실행
     * @param rc
     * @param path
     * @param ctx
     * @param children
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List children) {

        KeeperException.Code code = KeeperException.Code.get(rc);
        logger.debug("CALLBACK - getChildren, path : {}, code : {} ", path, code);

        // 성공
        if(code == KeeperException.Code.OK) {
            if(children != null) {

                // do something...
                // NodeChildrenChanged event 발생시 등록한 callback 에 의해 수행 될 비즈니스 로직
                logger.debug("children {} ", children);
                for (String c : children) {
                    logger.debug("child node {}" + c);

                    try {
                        // 하위의 하위가 변경된게 아니기때문에 하위의 getChildren call back 필요없음
                        zk.getChildren(path + "/" + c, true);

                        zk.exists(path + "/" + c, true);
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }
        // 실패
        else {
        }

    }

    /**
     * exists callback
     * implements AsyncCallback.StatCallback
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }

}


※ operation 실행시 watcher parameter 의  true 와 watcher object 의 차이
true - default watcher 등록
watcher object - 인자로 전달되는 object 를 watcher 로 등록 (this 는 현재 object 를 watcher 로 등록)

댓글 없음:

댓글 쓰기

Intelij 설정 및 plugin

1. preferences... (settings...) Appearance & Behavior > Appearance - Window Options        ✓   Show memory indicator Editor ...