分布式系列七: zookeeper简单用法

发布日期:2019-02-13

zookeeper是分布式开源框架 是Google Chubby的一个实现 主要作为分布式系统的协调服务. Dobbo等框架使用了其功能.

zookeeper特性

顺序一致性: 事务请求最终会严格按顺序执行原子性:可靠性:实时性:单一视图:

安装

使用windows的linux子系统时: cd /mnt/e/chromedownload/转到windows下载路径拷贝 cp /mnt/e/chromedownload/zookeeper.tar.gz /program/zookeeper.tar.gz转到 cd /program 如果没有的先mkdir program解压 tar -zxvf zookeeper.tar.gz转到 cd ZK_HOME/conf拷贝 cp zoo_sample.cfg zoo.cfg

转到 cd ZK_HOME/bin启动 sh zkServer.sh start

集群搭建

zoo.cfg中配置集群 配置格式如 server.id=ip:port:port 有几台就配置几个.例如:server.1=192.168.1.145:2897:3181server.2=192.168.1.146:2897:3181server.3=192.168.1.147:2897:3181zoo.cfg中的dataDir所配置的目录下新增myid文件 值对应server.id的idzoo.cfg中 如果需要observer则添加peerType=observer 并且修改server.id=ip:port:port:observer

zoo.cfg 配置

tickTime=2000 zk的方法最小时间单位initTime=10 时长为10*tickTime follow节点和leader节点同步的时间syncLimit=5 时长为5*tickTime leader和follow几点进行心跳检测的最大延迟时间dataDir=/tmp/zookeeper 存储快照文件的目录dataLogDir = /log/zookeeper 事务日志的存储路径 默认在dataDir下clientPort=2181 连接zookeeper的默认端口

zookeeper几个概念

znode: zookeeper数据存储为树形结构 深度层级没有限制 znode是数据存储节点是zookeeper的最小存储单元. 分为1.持久化节点2.持久化有序节点3.临时节点4.临时有序节点临时节点 是会话时生成的节点 会话结束后节点会自动删除.

客户端命令操作

help 可以查看客户端支持的命令

create [-s] [-e] node : 创建节点 -s是有序 -e临时节点get path [watch] 获取节点的信息set path data [version] : 修改节点的值delete path [version] : 删除节点 当节点有子节点时无法删除

[version] 乐观锁

[Watcher] 提供了发布/订阅 允许客户端向服务器端注册一个监听 当服务端触发指定事件时会触发watcher服务端向客户端发送一个通知.Watcher是一次性的 触发一次后自动失效

信息节点

stat path 可以查看节点的信息

cversion=0 子节点的版本AclVersion=0 acl的版本号 权限控制相关dataVersion=1 数据的版本号cxid 创建的事务idmzxid 最后一次修改的事务idpzxid 子节点最后一次修改的事务idephemeralOwner 临时会话的iddataLength 数据长度numChidren 子节点数量

java开发

引用依赖

<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version></dependency>

java代码

/** * 定义Watcher */public class MyWatcher implements Watcher { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.SyncConnected){ System.out.println("---->>>>>"+event.getType()) System.out.println("---->>>>>"+event.getPath()) } }}//测试public class MySession { private static final String CONN = "localhost:2181" private static Stat stat = new Stat() public static void main(String[] args) throws IOException KeeperException InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(CONN1000new MyWatcher()) // 创建 zooKeeper.create("/xlx""this is a string".getBytes() ZooDefs.Ids.OPEN_ACL_UNSAFE CreateMode.PERSISTENT) // 查询注册watcher byte[] rst = zooKeeper.getData("/xlx"truestat) System.out.println(new String(rst)) //删除(只能删除永久节点) zooKeeper.delete("/xlx"-1) // 创建 zooKeeper.create("/xlx""this is a string".getBytes() ZooDefs.Ids.OPEN_ACL_UNSAFE CreateMode.PERSISTENT) // 查询 byte[] rs = zooKeeper.getData("/xlx"truestat) System.out.println(new String(rs)) // 注册watcher zooKeeper.exists("/xlx/yy"true) // 创建 zooKeeper.create("/xlx/yy""this is a sub child string".getBytes() ZooDefs.Ids.OPEN_ACL_UNSAFE CreateMode.PERSISTENT) // 获取子节点 List<String> children = zooKeeper.getChildren("/xlx" true) System.out.println(children) //删除(只能删除永久节点) zooKeeper.delete("/xlx/yy"-1) // 修改 zooKeeper.setData("/xlx""this is a modified string".getBytes()-1) byte[] rss = zooKeeper.getData("/xlx"truestat) System.out.println(new String(rss)) //删除(只能删除永久节点) zooKeeper.delete("/xlx"-1) // watcher 异步的 这里停留段时间才可以查看到watcher打印的信息 Thread.sleep(2000) }}

三方API

zkClientcurator 这个用的较多Netflix开源

curator开发

特点:

抽象层次更高链式编程风格异步回调

public class CuratorSession { private static final String CONN = "localhost:2181" public static void main(String[] args) throws Exception { // 创建 CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONN 2000 5000 new ExponentialBackoffRetry(10003)) curatorFramework.start() System.out.println(curatorFramework.getState()) //另一种方式 //curatorFramework = CuratorFrameworkFactory.builder().build() // 新增节点 curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/curator/chd/dd""dfadfe".getBytes()) //读取 byte[] data = curatorFramework.getData().forPath("/curator/chd/dd") java.lang.String string = new java.lang.String(data) System.out.println(string) // 修改 Stat stat = curatorFramework.setData().forPath("/curator/chd/dd""fdaefv".getBytes()) System.out.println(stat) curatorFramework.setData().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client CuratorEvent event) throws Exception { System.out.println(event) } }).forPath("/curator/chd/dd""fafdae".getBytes()) //删除 curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/chd") Thread.sleep(2000) }}

这些代码写下来就算入门了 curator真正有用的使用场景还没接触到 比如分布式锁leader选举等 curator有示例程序 可以在github上查看Curator源码.