驽马十驾 驽马十驾

驽马十驾,功在不舍

目录
Zookeeper 的监听
/  

Zookeeper 的监听

开篇

zookeeper 的开源框架curator中关于listener的使用不算难,这个地方我整理了2篇比较好的文章,对齐用法作出说明。

参考文章如下所示:

此文不是原创,是参考后进行了2文的整理和总结,如果文章有说明的不清晰的地方,请参考原文出处。

三更灯火五更鸡,正是男儿读书时。

引入 Maven

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
            <!--此处需要排除 log4j2 的引导,因为当前项目已使用logback-->
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--对监听的操作封装到了此库中-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

常见错误

在进正文前,写将我在写这个博客过程中发现的一些旁枝末节的问题告诉大家:

  1. zookeeper在创建节点之前,请一定要判定其节点是否存放,否则可能会报错:org.apache.zookeeper.KeeperException$NodeExistsException:,核心错误是:NodeExistsException
  2. zookeeper中如果使用usingWatcher的时候,请注意你监听的节点必须是已经存在的,否则会抛出错误org.apache.zookeeper.KeeperException$NodeExistsException:,核心错误是NodeExists
  3. 临时节点下不能再挂节点,只有持久化节点下才能挂节点(临时和非临时都可)

监听一:usingWatcher

直接先上代码:

  1. 设置监听
        byte[] content = client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听器watchedEvent:" + watchedEvent);
            }
        }).forPath(path);

通过源码查看可以发现 WatchedEvent中有如下事件:

  public enum EventType {
            None (-1),
            NodeCreated (1), //节点创建
            NodeDeleted (2), //节点删除
            NodeDataChanged (3), //节点数据改变
            NodeChildrenChanged (4); //子节点改变
  			//省略部分代码...
  }

由此可见其支持的事件还是很多的,假如我使用如下代码来改变节点数据:

        //设置获取修改节点
        client.setData().forPath(path, "Hicode".getBytes());
        ThreadUtil.sleep(2, TimeUnit.SECONDS);

        client.setData().forPath(path, "Club".getBytes());
        ThreadUtil.sleep(100, TimeUnit.SECONDS);

你可以发现,监听只会触发一次,只会输出一次。

这个也确实是该用法的一个问题,每个事件监听只会触发一次,如果需要反复触发,需要在每次进行操作前再添加一个监听,代码如下所示,通过这个案例你会发现使用 usingWatcher会比较繁琐:

       // 第一次注册监听
       client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听器watchedEvent:" + watchedEvent);
            }
        }).forPath(path);

        //设置获取修改节点
        System.out.println("第一次改变");
        client.setData().forPath(path, "Hicode".getBytes());

		// 第二次注册监听
        client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听器watchedEvent:" + watchedEvent);
            }
        }).forPath(path);
        ThreadUtil.sleep(2, TimeUnit.SECONDS);

        System.out.println("第二次改变");
        client.setData().forPath(path, "Club".getBytes());
        ThreadUtil.sleep(12, TimeUnit.SECONDS);

总结下:

  1. usingWatcher只会触发一次,每一次被触发后,需要再次注册监听才行。

监听二:CuratorListener

还是先上代码,如下代码表示注册监听

CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("监听事件触发,event事件为:" + event.getType());
            }
client.getCuratorListenable().addListener(listener);

假设我们有如下代码来改变状态:

//创建临时子节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path + "/hi");
// 异步获取节点数据
client.getData().inBackground().forPath(path);
// 变更节点内容inBackground 。
client.setData().forPath(path, "124443".getBytes());
//创建临时子节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path + "/hi");
client.delete().deletingChildrenIfNeeded().forPath(path);

仔细观察结果可以发现:

监听事件触发,event事件为WATCHED
监听事件触发,event事件为GET_DATA
监听事件触发,event事件为CREATE

通过输出,我们可以发现,监听一共触发了3次。

  1. 第一次是默认肯定会触发的,因为他的typewatched表示被监听。
  2. 第二次是GET_DATA获取数据,结合代码可以看出上client.getData()触发的。
  3. 第三次是CREATE创建事件,通过比对代代码可以看出是因为create触发的。

你会不会有疑问,为什么setData()delete()没有触发,是因为不支持嘛?

如果你仔细你可以发现,触发了事件监听的都是因为一句关键代码inBackground。你可以试试在没有触发的代码中加入这个试试。

那么他支持哪些事件了,通过查看源码可以分析出:

package org.apache.curator.framework.api;

public enum CuratorEventType {
    CREATE,
    DELETE,
    EXISTS,
    GET_DATA,
    SET_DATA,
    CHILDREN,
    SYNC,
    GET_ACL,
    SET_ACL,
    TRANSACTION,
    GET_CONFIG,
    RECONFIG,
    WATCHED,
    REMOVE_WATCHES,
    CLOSING;
}

几乎所有的事件都是支持的,前提是通过 inbackground在后台运行。

监听三:Cache 监听

在使用此方式的时候,是需要引入这个 jar 包的。

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

他是通过缓存的节点数据的变更来完成监听的,在我的理解中是通过缓存数据的变更来触发事件的,并不是直接接听服务节点。

NodeCache

该缓存监听的是节点自身的变化,比如说创建、变更和删除。

    @Test
    public void nodeCacheTest() throws Exception {
        String path = "/hicode/club";
        CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(new RetryNTimes(5, 2000)) .connectString(CONNECT_STR).build();
        client.start();
        //通过 client 创建 nodeCache,请注意一定要 start
        NodeCache nodeCache = new NodeCache(client, path);
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("nodeChanged...");
            }
        });
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path+"/"+"test");
        client.setData().forPath(path+"/test","cc".getBytes());
        client.setData().forPath(path+"/test","hh".getBytes());
        client.delete().forPath(path+"/test");
        ThreadUtil.sleep(5,TimeUnit.MINUTES);
        nodeCache.close();
        client.close();
    }

上述代码如下代码是构建缓存并创建监听:

        //通过 client 创建 nodeCache,请注意一定要 start
        NodeCache nodeCache = new NodeCache(client, path);
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("nodeChanged...");
            }
        });

最终输出的只有一句:

nodeChanged...

刚好和我前文提到的:NodeCache只会监听节点自身的变更。

PathCache

该缓存变相监听的是节点的路径的变更,比如说因为子节点创建导致路径变长,子节点删除导致路径变短,对节点自身的值改变不做监听。

    @Test
    public void pathCacheTest() throws Exception {
        String path = "/hicode/club/childPath";
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .retryPolicy(new RetryNTimes(5, 2000)) .connectString(CONNECT_STR).build();
        client.start();
        PathChildrenCache pathCache = new PathChildrenCache(client, path, false);
        pathCache.start();
        //默认会有事件:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
        pathCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件类型是:" + event);
            }
        });
        //创建节点,不触发
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
            .forPath(path);
        //对节点赋值,不触发
        client.setData().forPath(path, "childPath".getBytes());
        String tempPath = path + "/temp";
        //创建节点触发
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
            .forPath(tempPath)
        client.setData().forPath(tempPath, "666".getBytes());
        ThreadUtil.sleep(5, TimeUnit.MINUTES);
        pathCache.close();
        client.close();
    }

输出结果为:

事件类型是:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
事件类型是:PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/hicode/club/childPath/temp', stat=468,469,1527855307449,1527855307451,1,0,0,72057686640754862,3,0,468
, data=null}}

第一个事件:CONNECTION_RECONNECTED是属于必定触发的。

第二个事件:CHILD_ADDED是增加子节点的时候触发的。

其他的比如在设置节点数据等的时候,事件未触发,符合我们了解到的资料。

注意代码中最开始监听的是: String path = "/hicode/club/childPath"; 那么后续的事件触发都是该节点下的。

TreeCache

TreeCache 属于上述2者的结合体,既监听了节点自身的变化,又监听了节点路径的变化。

   @Test
    public void treeCacheTest() throws Exception {
        String path = "/hicode/club/tree";
        CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(new RetryNTimes(5, 2000))
                .connectString(CONNECT_STR).build();
        client.start();
        TreeCache treeCache = new TreeCache(client, path);
        treeCache.start();
        //默认会有事件:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("事件类型是:" + event);
            }
        });
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path + "/t1", "xxx".getBytes());
        //client.setData().forPath(path, "childPath".getBytes());
        ThreadUtil.sleep(1, TimeUnit.SECONDS);
        System.out.println("==============");
        String tempPath = path + "/t2";
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(tempPath, "xxx".getBytes());
        client.setData().forPath(tempPath, "666".getBytes());
        client.delete().forPath(tempPath);
        ThreadUtil.sleep(5, TimeUnit.MINUTES);
        treeCache.close();
        client.close();
    }

结果为:

事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree', stat=566,566,1527856851406,1527856851406,0,11,0,0,0,1,589
, data=[]}}
事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree/t1', stat=589,589,1527857005459,1527857005459,0,0,0,72057686640754891,3,0,589
, data=[120, 120, 120]}}
事件类型是:TreeCacheEvent{type=INITIALIZED, data=null}
==============
事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree/t2', stat=590,590,1527857006474,1527857006474,0,0,0,72057686640754891,3,0,590
, data=[120, 120, 120]}}
事件类型是:TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/hicode/club/tree/t2', stat=590,591,1527857006474,1527857006483,1,0,0,72057686640754891,3,0,590
, data=[54, 54, 54]}}
事件类型是:TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/hicode/club/tree/t2', stat=590,591,1527857006474,1527857006483,1,0,0,72057686640754891,3,0,590
, data=[54, 54, 54]}}

TreeCache可以理解为NodeCache+PathCache的集合。

总结

如上就是 Zookeeper常用的用于节点监听的方式,over。

骐骥一跃,不能十步。驽马十驾,功在不舍。