大数据之Zookeeper
大数据之Zookeeper
文章目录
1. Zookeeper 入门
1.1 概述
Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
Zookeeper 从设计模式角度来理解:是一个基于观案者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
1.2 特点
1)Zookeeper:一个领导者(Leader) ,多个跟随者(Follower)组成的集群。
2)集群中只要有半数以上节点存活,Zookeeper 集群就能正常服务。
3)全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
4)更新请求顺序进行,来自同一个 Client 的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client 能读到最新数据。
1.3 数据结构
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1 MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
1.4 应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名 ,便于识别。例如:IP 不容易记住,而域名容易记住。
统一配置管理
(1)分布式环境下,配置文件同步非常常见。
① 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
② 对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由 ZooKeeper 实现。
① 可将配置信息写入 ZooKeeper 上的一个 Znode 。
② 各个客户端服务器监听这个 Znode。
③ 一旦 Znode 中的数据被修改,ZooKeeper 将通知各个客户端服务器。
统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。
可根据节点实时状态做出一些调整。
(2)ZooKeeper 可以实现实时监控节点状态变化
① 可将节点信息写入Z ooKeeper 上的一个 ZNode。
② 监听这个 ZNode 可获取它的实时状态变化。
服务器动态上下线
软负载均衡
在 Zookeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
2. Zookeeper 安装
2.1 下载地址
zookeeper 官网
2.2 本地模式安装部署
准备工作
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/
mv apache-zookeeper-3.5.6-bin/ zookeeper
mv zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper/data
vim zoo.cfg
dataDir=/usr/local/zookeeper/data
vim /etc/profile
在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=
P A T H : PATH:
P
A
T
H
: ZOOKEEPER_HOME/bin
source /etc/profile
启动 Zookeeper
zkServer.sh start
启动客户端
zkCli.sh
退出客户端
quit
停止 Zookeeper
zkServer.sh stop
2.3 分布式安装部署
集群规划
在 master、slave1 和 slave2 三个节点上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目录内容到 slave1、slave2
xsync zookeeper/
配置服务器编号
① 在 /usr/local/zookeeper/ 这个目录下创建 zkData
mkdir data
② /usr/local/zookeeper/data 目录下创建一个 myid 的文件
touch myid
③ 编辑 myid 文件
vim myid
在文件中添加与 server 对应的编号:
0
④ 分发到其他机器上
xsync myid
并分别在 slave1、slave2 上修改 myid 文件中内容为 1、2
配置 zoo.cfg 文件
① 将 /usr/local/zookeeper/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打开 zoo.cfg 文件,修改 dataDir 路径
dataDir=/usr/local/zookeeper/data
增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg
修改环境变量
① 打开配置文件
vim /etc/profile
② 在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=
P A T H : PATH:
P
A
T
H
: ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三台机器)
source /etc/profile
集群操作
① 三台机器分别启动 Zookeeper
zkServer.sh start
② 三台机器分别关闭 Zookeeper
zkServer.sh stop
编写 Zookeeper 的群起群关脚本
① 在 /usr/local/bin 目录下创建 zk 文件
vim zk.sh
#!/bin/bash
case $1 in
"start"){
for i in master slave1 slave2
do
echo "****************** $i *********************"
ssh $i "source /etc/profile && zkServer.sh start"
done
};;
"stop"){
for i in master slave1 slave2
do
echo "****************** $i *********************"
ssh $i "source /etc/profile && zkServer.sh stop"
done
};;
esac
修改脚本 zk 具有执行权限
chmod 777 zk.sh
调用脚本形式:zk start 或 zk stop
2.4 配置参数解读
Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒
Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime 时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session 的最小超时时间是 2*tickTime)
initLimit =10:LF 初始通信时限
集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
syncLimit =5:LF 同步通信时限
集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
dataDir:数据文件目录+数据持久化路径
主要用于保存 Zookeeper 中的数据。
clientPort =2181:客户端连接端口
监听客户端连接的端口。
server.A=B:C:D
A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个server。
B 是这个服务器的 ip 地址;
C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
3. Zookeeper 内部原理
3.1 选举机制
半数机制
集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。
选举过程例子
假设有五台服务器组成的 Zookeeper 集群,它们的 id 从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动。
① 服务器 1 启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
② 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。
③ 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。
④ 服务器 4 启动,根据前面的分析,理论上服务器4应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以它只能接收当小弟的命了。
⑤ 服务器 5 启动,同 4 一样当小弟。
3.2 节点类型
持久(Persistent)
客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral)
客户端和服务器端断开连接后,创建的节点自己删除
节点类型
① 持久化目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在。
② 持久化顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号
③ 临时目录节点
客户端与 Zookeeper 断开连接后,该节点被删除
④ 临时顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号。
说明: 创建 znode 时设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
注意: 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
3.3 Stat 结构体
czxid: 创建节点的事务 zxid
每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeepe r事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
ctime: znode 被创建的毫秒数(从 1970 年开始)
mzxid: znode 最后更新的事务 zxid
mtime: znode 最后修改的毫秒数(从 1970 年开始)
pZxid: znode 最后更新的子节点 zxid
cversion : znode 子节点变化号,znode 子节点修改次数
dataversion: znode 数据变化号
aclVersion: znode 访问控制列表的变化号
ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
dataLength: znode 的数据长度
numChildren: znode 子节点数量
3.4 监听器原理
监听原理详解:
① 首先要有一个 main() 线程
② 在 main 线程中创建 Zokeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener) 。
③ 通过 connect 线程将注册的监听事件发送给 Zookeeper。
④ 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中。
⑤ Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程。
⑥ listener 线程内部调用了 process() 方法。
常见的监听
① 监听节点数据的变化
get -w path
② 监听子节点增减的变化
ls -w path
3.5 写数据流程
4. Zookeeper 实战
4.1 客户端命令行操作
启动客户端
zkCli.sh
显示所有操作命令
help
查看当前 znode 中所包含的内容
ls /
ls2 /
查看当前节点详细数据
ls -s /
分别创建 2 个普通节点
create /animals “dog”
create /animals/small “ant”
获得节点的值
get /animals
get /animals/small
创建短暂节点
create -e /animals/big “elephant”
创建带序号的节点
create -s /animals/middle “hourse”
修改节点数据值
set /animals/small “bug”
节点的值变化监听
① 在 slave1 主机上注册监听 /animals 节点数据变化
get -w /animals
② 在 slave2 主机上修改 /animals 节点的数据
set /animals “cat”
③ 观察 slave1 主机收到子节点变化的监听
节点的子节点变化监听(路径变化)
① 在 slave1 主机上注册监听 /animals 节点的子节点变化
ls -w /animals
② 在 slave2 主机 /animals 节点上创建子节点
create /animals/mini “fly”
③ 观察 slave1 主机收到子节点变化的监听
删除节点
delete /animals/big
递归删除节点
deleteall /animals/mini
查看节点状态
stat /animals
4.2 API 操作
4.3.1 IDEA 环境搭建
创建一个 Maven 工程
在 pom 文件中添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
在项目的 src/main/resources 目录下,新建一个文件,命名为 “log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.3.2 创建 ZooKeeper 客户端
@SpringBootTest
public class ZookeeperTest {
private static String connectString = "localhost:2181";
private static int sessionTimeout = 2000;
private static ZooKeeper zkClient;
@Test
public static void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
4.3.3 创建子节点
先将上面的 init() 方法前面的注解 @Test 改为 @BeforeAll
// 创建子节点
@SpringBootTest
public class ZookeeperTest {
private static String connectString = "localhost:2181";
private static int sessionTimeout = 2000;
private static ZooKeeper zkClient;
@BeforeAll
public static void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
@Test
public void createNode() throws Exception {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
}
4.3.4 获取子节点并监听节点变化
// 获取子节点并监听节点变化
@SpringBootTest
public class WatchTest {
private static String connectString = "localhost:2181";
private static int sessionTimeout = 2000;
private static ZooKeeper zkClient;
@Test
public void getChildrenAndWatch() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
@BeforeAll
public static void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
List<String> children = null;
try {
children = zkClient.getChildren("/", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (String child : children) {
System.out.println(child);
}
}
});
}
}
4.3.5 判断 Znode 是否存在
// 判断znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}
4.3 监听服务器节点动态上下线案例
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析
代码实现
① 先在集群上创建 /servers 节点
create /servers “servers”
② 服务器端向 Zookeeper 注册代码
package zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "master:2181,slave1:2181,slave2:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
public static void main(String[] args) throws Exception {
args = new String[]{"slave1"};
DistributeServer server = new DistributeServer();
// 1.连接zookeeper集群
server.getConnect();
// 2.注册节点
server.register(args[0]);
// 3.业务逻辑处理
server.business();
}
private void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
}
private void register(String hostname) throws KeeperException, InterruptedException {
String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online");
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
}
③ 客户端代码
package zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "master:2181,slave1:2181,slave2:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
// 1.连接zookeeper集群
client.getConnect();
// 2.注册监听
client.getChildren();
// 3.业务逻辑处理
client.business();
}
private void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
try {
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
private void getChildren() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/servers", true);
// 存储服务器节点主机名称集合
ArrayList<String> hosts = new ArrayList<String>();
for (String child : children) {
byte[] data = zkClient.getData("/servers/" + child, false, null);
hosts.add(new String(data));
}
System.out.println(hosts);
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
}
5 zookeeper框架
5.1 org.apache.zookeeper
5.2 zkclient
5.2.1 简介
ZkClient 是由 Datameer 的工程师开发的开源客户端,对 Zookeeper 的原生 API 进行了包装,实现了超时重连、Watcher 反复注册等功能。
在使用 ZooKeeper 的 Java 客户端时,经常需要处理几个问题:重复注册 watcher、session失效重连、异常处理。
IZKConnection:是一个ZkClient与Zookeeper之间的一个适配器;在代码里直接使用的是ZKClient,实质上还是委托了zookeeper来处理了。
在ZKClient中,根据事件类型,分为
节点事件(数据事件),对应的事件处理器是IZKDataListener;
子节点事件,对应的事件处理器是IZKChildListener;
Session事件,对应的事件处理器是IZKStatusListener;
ZkEventThread:是专门用来处理事件的线程
目前已经运用到了很多项目中,知名的有 Dubbo、Kafka、Helix。
5.2.2 Maven依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
5.2.3 ZkClient 的设计
从上述结构上看,IZKConnection 是一个 ZkClient 与 ZooKeeper 之间的一个适配器。在代码里直接使用的是 ZKClient,其实质还是委托了 zookeeper 来处理了。
使用 ZooKeeper 客户端来注册 watcher 有几种方法: 1、创建 ZooKeeper 对象时指定默认的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注册的是某个节点的事件处理器(watcher),getchildren 注册的是子节点的事件处理器(watcher)。而在 ZKClient 中,根据事件类型,分为了节点事件(数据事件)、子节点事件。对应的事件处理器则是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相关的事件和事件处理器。
ZkEventThread 是专门用来处理事件的线程。
5.2.4 重要处理流程说明
启动 ZKClient
在创建 ZKClient 对象时,就完成了到 ZooKeeper 服务器连接的建立。具体过程是这样的:
启动时,指定好 connection string,连接超时时间,序列化工具等。
创建并启动 eventThread,用于接收事件,并调度事件监听器 Listener 的执行。
连接到 zookeeper 服务器,同时将 ZKClient 自身作为默认的 Watcher。
为节点注册Watcher:
ZooKeeper 的三个方法:getData、getChildren、exists.
ZKClient 都提供了相应的代理方法。就拿 exists 来看:
可以看到,是否注册 watcher,由 hasListeners(path)来决定的。
hasListeners 就是看有没有与该数据节点绑定的 listener。
所以,默认情况下,都会自动的为指定的 path 注册 watcher,并且是默认的 watcher (ZKClient)。怎么才能让 hasListeners 判定值为 true 呢,也就是怎么才能为 path 绑定 Listener 呢?
ZKClient提供了订阅功能:
一个新建的会话,只需要在取得响应的数据节点后,调用 subscribteXxx 就可以订阅上相应的事件了。
5.2.5 客户端处理变更(Watcher通知)
前面已经知道,ZKClient 是默认的 Watcher,并且在为各个数据节点注册的 Watcher 都是这个默认的 Watcher。那么该是如何将各种事件通知给相应的 Listener 呢?
处理过程大致可以概括为下面的步骤:
判断变更类型:变更类型分为 State 变更、ChildNode 变更(创建子节点、删除子节点、修改子节点数据)、NodeData 变更(创建指定 node,删除节点,节点数据变更)。
取出与 path 关联的 Listeners,并为每一个 Listener 创建一个 ZKEvent,将 ZkEvent 交给 ZkEventThread 处理。
ZkEventThread 线程,拿到 ZkEvent 后,只需要调用 ZkEvent 的 run 方法进行处理。 从这里也可以知道,具体的怎么如何调用 Listener,还要依赖于 ZkEvent 的 run()实现了。
注册监听 watcher:
接口类 | 注册监听方法 | 解除监听方法 |
---|---|---|
IZkChildListener(子节点) | ZkClient的subscribeChildChanges方法 | ZkClient 的unsubscribeChildChanges 方法 |
IZkDataListener(数据) | ZkClient 的subscribeDataChanges 方法 | ZkClient 的 unsubscribeDataChanges 方法 |
IZkStateListener(客户端状 态) | ZkClient 的 subscribeStateChanges 方 法 | ZkClient 的 unsubscribeStateChanges 方法 |
在 ZkClient 中客户端可以通过注册相关的事件监听来实现对 Zookeeper 服务端时间的订阅。
其中 ZkClient 提供的监听事件接口有以下几种:
其中 ZkClient 还提供了一个 unsubscribeAll 方法,来解除所有监听。
Zookeeper 中提供的变更操作有:节点的创建、删除,节点数据的修改:
创建操作,数据节点分为四种,ZKClient 分别为他们提供了相应的代理:
删除节点的操作:
修改节点数据的操作:
writeDataReturnStat():写数据并返回数据的状态。
updateDataSerialized():修改已序列化的数据。执行过程是:先读取数据,然后使用DataUpdater 对数据修改,最后调用 writeData 将修改后的数据发送给服务端。
5.2.6 序列化处理
ZooKeeper 中,会涉及到序列化、反序列化的操作有两种:getData、setData。在 ZKClient 中,分别用 readData、writeData 来替代了。
对于 readData:先调用 zookeeper 的 getData,然后进行使用 ZKSerializer 进行反序列化工 作。
对于 writeData:先使用 ZKSerializer 将对象序列化后,再调用 zookeeper 的 setData。
5.2.7 ZkClient如何解决使用ZooKeeper客户端遇到的问题的呢?
Watcher 自动重注册:这个要是依赖于 hasListeners()的判断,来决定是否再次注册。如果对此有不清晰的,可以看上面的流程处理的说明。
Session 失效重连:如果发现会话过期,就先关闭已有连接,再重新建立连接。
异常处理:对比 ZooKeeper 和 ZKClient,就可以发现 ZooKeeper 的所有操作都是抛异常 的,而 ZKClient 的所有操作,都不会抛异常的。在发生异常时,它或做日志,或返回空, 或做相应的 Listener 调用。
相比于 ZooKeeper 官方客户端,使用 ZKClient 时,只需要关注实际的 Listener 实现即可。所 以这个客户端,还是推荐大家使用的。
5.2.8 API介绍
启动ZKClient:在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立
1、启动时,制定好connection string,连接超时时间,序列化工具等
2、创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行
3、连接到Zookeeper服务器,同时将ZKClient自身作为默认的Watcher
为节点注册Watcher
Zookeeper 原始API的三个方法:getData,getChildren、exists,ZKClient都提供了相应的代理方法,比如exists,
hasListeners是看有没有与该数据节点绑定的listener
所以,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient),那么怎样才能让hasListeners值为true呢,也就是怎么才能为path绑定Listener呢?
ZKClient提供了订阅功能,一个新建的会话,只需要在取得响应的数据节点后,调用subscribeXXX就可以订阅上相应的事件了。
5.2.9 demo
- createParents可以递归创建节点(public void createPersistent(String path, boolean createParents))
- 无需注册watcher(前面也说了,ZKClient帮我们做好了)
- 节点内容可以传任意类型数据
- 可以自定义内容的序列化和反序列化
- 在没指定zkSerializer时,默认使用java自动的序列化和反序列化
public class ZkClientCrud<T> {
ZkClient zkClient ;
final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);
public ZkClientCrud(ZkSerializer zkSerializer) {
logger.info("链接zk开始");
// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
}
public void createEphemeral(String path,Object data){
zkClient.createEphemeral(path,data);
}
/***
* 支持创建递归方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
/***
* 创建节点 跟上data数据
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
/***
* 子节点
* @param path
* @return
*/
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public T readData(String path){
return zkClient.readData(path);
}
public void writeData(String path,Object data){
zkClient.writeData(path,data);
}
//递归删除
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
}
public class ZkClientCrudTest {
final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
public static void main(String[] args) {
ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
String path="/root";
zkClientCrud.deleteRecursive(path);
zkClientCrud.createPersistent(path,"hi");
/* zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value
//zkClientCrud.createPersistent(path,"hi");
logger.info(zkClientCrud.readData(path));
//更新
zkClientCrud.writeData(path,"hello");
logger.info(zkClientCrud.readData(path));
logger.info(String.valueOf(zkClientCrud.getChildren(path)));
//子节点
List<String> list=zkClientCrud.getChildren(path);
for(String child:list){
logger.info("子节点:"+child);
}*/
User user=new User();
user.setId(1);
user.setName("张三");
zkClientCrud.writeData(path,user);
System.out.println(zkClientCrud.readData(path).getName());;
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private Integer id;
private String name;
}
Watcher
public class ZkClientWatcher {
ZkClient zkClient;
public ZkClientWatcher() {
zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
}
public void createPersistent(String path, Object data) {
zkClient.createPersistent(path, data);
}
public void writeData(String path, Object object) {
zkClient.writeData(path, object);
}
public void delete(String path) {
zkClient.delete(path);
}
public boolean exists(String path) {
return zkClient.exists(path);
}
public void deleteRecursive(String path) {
zkClient.deleteRecursive(path);
}
//对父节点添加监听数据变化。
public void subscribe(String path) {
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("删除的节点为:%s\r\n", dataPath);
}
});
}
//对父节点添加监听子节点变化。
public void subscribe2(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n");
}
});
}
//客户端状态
public void subscribe3(String path) {
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if (state == Watcher.Event.KeeperState.SyncConnected) {
//当我重新启动后start,监听触发
System.out.println("连接成功");
} else if (state == Watcher.Event.KeeperState.Disconnected) {
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
} else
System.out.println("其他状态" + state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
/* @Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
}*/
}
public class ZkClientWatcherTest {
public static void main(String[] args) throws InterruptedException {
ZkClientWatcher zkClientWatche=new ZkClientWatcher();
String path="/root";
zkClientWatche.deleteRecursive(path);
zkClientWatche.createPersistent(path,"hello");
zkClientWatche.subscribe(path);
zkClientWatche.subscribe2(path);
// zkClientWatche.subscribe3(path);//需要启服务
// Thread.sleep(Integer.MAX_VALUE);
zkClientWatche.createPersistent(path+"/root2","word");
TimeUnit.SECONDS.sleep(1);
zkClientWatche.writeData(path,"hi");
TimeUnit.SECONDS.sleep(1);
//zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常
zkClientWatche.deleteRecursive(path);
TimeUnit.SECONDS.sleep(1); //这个main线程就结束
}
}
public class ZookeeperUtil {
/** zookeeper服务器地址 */
// public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
public static final String connectString = "localhost:2181";
/** 定义session失效时间 */
public static final int sessionTimeout = 5000;
public static final String path = "/root";
}
5.3 Curator
5.3.1 简介
zookeeper不是为高可用性设计的,但它使用ZAB协议达到了极高的一致性。所以它经常被选作注册中心、配置中心、分布式锁等场景。
它的性能是非常有限的,而且API并不是那么好用。xjjdog倾向于使用基于Raft协议的Etcd或者Consul,它们更加轻量级一些。
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。
Zookeeper 原生API问题:
1.超时重连,不支持自动,需要手动操作
2.Watch注册一次后会失效
3.不支持递归创建节点
Zookeeper API 升级版 Curator:
1.解决watcher的注册一次就失效
2.提供更多解决方案并且实现简单
3.提供常用的ZooKeeper工具类
4.编程风格更爽,点点点就可以了
5.可以递归创建节点等
Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes。
5.3.2 版本问题
Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。
Curator4 统一对 ZooKeeper 3.4.x 和 3.5.x 的支持
5.3.3 CuratorFramework
Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自动连接管理:
当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明
监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法
Curator recipes自动移除监控
更加清晰的API
简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等。
5.3.4 curator-recipes
curator-recipes:封装了一些高级特性,如:Cache事件监听、 Elections选举、分布式锁、分布式计数器、分布式Barrier、Queues队列等
5.3.5 知识点
1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
6.基于curator实现Zookeeper分布式锁(需要掌握基本的多线程知识)
7.基于curator实现分布式计数器
5.3.6 Maven依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!--建议和本地安装版本保持一致-->
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
5.3.7 api
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkConnectCuratorUtil {
final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);
public CuratorFramework zkClient = null; //zk的客户端工具Curator(在本类通过new实例化的是,自动start)
private static final int MAX_RETRY_TIMES = 3; //定义失败重试次数
private static final int BASE_SLEEP_TIME_MS = 5000; //连接失败后,再次重试的间隔时间 单位:毫秒
private static final int SESSION_TIME_OUT = 1000000; //会话存活时间,根据业务灵活指定 单位:毫秒
private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服务所在的IP和客户端端口
private static final String NAMESPACE = "workspace";//指定后,默认操作的所有的节点都会在该工作空间下进行
//本类通过new ZkCuratorUtil()时,自动连通zkClient
public ZkConnectCuratorUtil() {
RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次连接失败后,重试策略
zkClient = CuratorFrameworkFactory.builder()
//.authorization("digest", "root:root".getBytes())//登录超级管理(需单独配)
.connectString(ZK_SERVER_IP_PORT)
.sessionTimeoutMs(SESSION_TIME_OUT)
.retryPolicy(retryPolicy)
.namespace(NAMESPACE).build();
zkClient.start();
}
public void closeZKClient() {
if (zkClient != null) {
this.zkClient.close();
}
}
public static void main(String[] args) {
ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();
boolean ifStarted=zkUtil.zkClient.isStarted();
System.out.println("当前客户的状态:" + (ifStarted ? "连接中" : "已关闭"));
zkUtil.closeZKClient();
boolean ifClose = zkUtil.zkClient.isStarted();
System.out.println("当前客户的状态:" + (ifClose ? "连接成功" : "已关闭"));
}
}
public class CuratorDao {
//使用curator(递归)添加节点
//级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)
public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {
zkClient.create()
.creatingParentContainersIfNeeded()//创建父节点,如果需要的话
.withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限
.forPath(nodePath, nodeData.getBytes());
System.out.println(nodePath + "节点已成功创建…");
}
//使用curator(递归)删除节点
//删除node节点及其子节点
public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {
zkClient.delete()
.guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功
.deletingChildrenIfNeeded() //级联删除子节点
//.withVersion(1)//版本号可以据需使用
.forPath(nodePath);
System.out.println(nodePath + "节点已删除成功…");
}
//使用curator更新节点数据
//更新节点data数据
public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {
zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带
System.out.println(nodePath + "节点数据已修改成功…");
}
//使用curator查询节点数据
//查询node节点数据
public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {
Stat stat = new Stat();
byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为" + new String(data));
System.out.println("节点的版本号为:" + stat.getVersion());
}
//使用curator查询节点的子节点
//打印node子节点
public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {
List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);
System.out.println("开始打印子节点");
for (String str : childNodes) {
System.out.println(str);
}
}
//使用curator判断节点是否存在
//判断node节点是否存在
public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {
Stat stat = zkClient.checkExists().forPath(nodePath);
System.out.println(null == stat ? "节点不存在" : "节点存在");
}
/**************使用Curator高级API特性之Cache缓存监控节点变化*************/
@Test
public void test() throws Exception {
ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();
CuratorFramework zkClient = zkUtil.zkClient;
// CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui");
// CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test");
// CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi");
// CuratorDao.getNodeData(zkClient,"/xiaosi/test");
// CuratorDao.printChildNodes(zkClient, "/xiaosi");
CuratorDao.checkNodeExists(zkClient, "/xiaosi");
}
}
5.3.8 使用Curator高级API特性之Cache缓存监控节点变化
cache是一种缓存机制,可以借助cache实现监听。
简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。
curator支持的cache种类有4种Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
它是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
2)Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
它是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
3)Tree Cache
Tree Cache是上两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
它是通过TreeCache类来实现的,监听器对应的接口为TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,该版本的出现是为了逐步淘汰上面3监听。
它是通过CuratorCache类来实现的,监听器对应的接口为CuratorCacheListener。
Curator一次性的watch
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
public class MyCuratorWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("触发watcher,节点路径为:" + event.getPath());
switch (event.getType()) {
case NodeCreated:
break;
default:
break;
}
}
}
//一次性的watch
public static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {
zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
}
NodeCache监听当前节点变化,通过NodeCacheListener接口持续监听节点的变化来实现
//持续监听的watch
public static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {
final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCache
nodeCache.start(false);//默认为false 设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空
ChildData cacheData=nodeCache.getCurrentData();
if(null==cacheData) {
System.out.println("NodeCache节点的初始化数据为空……");
}else {
System.out.println("NodeCache节点的初始化数据为"+new String(cacheData.getData()));
}
//设置循环监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData cdata=nodeCache.getCurrentData();
if(null==cdata) {
System.out.println("节点发生了变化,可能刚刚被删除!");
nodeCache.close();//关闭监听
}else {
String data=new String(cdata.getData());
String path=nodeCache.getCurrentData().getPath();
System.out.println("节点路径"+path+"数据发生了变化,最新数据为:"+data);
}
}
});
}
PathChildrenCache只监听子节点变化
通过PathChildrenCacheListener接口持续监听子节点来实现
//持续监听watch子节点的任何变化
public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception {
final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true);//把监听节点,转换为childrenCache
/**
* StartMode:初始化方式
* POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case)
* NORMAL:异步初始化 (不会进入下面的第一个case)
* BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中)
*/
childrenCache.start(StartMode.NORMAL);
List<ChildData> childDataList=childrenCache.getCurrentData();
System.out.println("当前节点所有子节点的数据列表如下:");
for (ChildData childData : childDataList) {
System.out.println(new String(childData.getData()));
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case INITIALIZED:
System.out.println("子节点初始化OK…");
break;
case CHILD_ADDED:
System.out.println("子节点"+event.getData().getPath()+"已被成功添加,数据data="+new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("子节点"+event.getData().getPath()+"数据发生变化,新数据data="+new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("子节点"+event.getData().getPath()+"已被移除~");
break;
case CONNECTION_RECONNECTED:
System.out.println("正在尝试重新建立连接…");
break;
case CONNECTION_SUSPENDED:
System.out.println("连接状态被暂时停止…");
break;
default:
break;
}
}
});
}
TreeCache是上两者的合体,既监听自身,也监听所有子节点变化
通过TreeCacheListener接口来实现
public static void treeCache(CuratorFramework zkClient) throws Exception {
final String path = "/treeChildrenCache";
final TreeCache treeCache = new TreeCache(zkClient, path);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
switch (event.getType()){
case NODE_ADDED:
System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath());
break;
case CONNECTION_LOST:
System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath());
break;
case CONNECTION_RECONNECTED:
System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath());
break;
case CONNECTION_SUSPENDED:
System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath());
break;
case INITIALIZED:
System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath());
break;
default:
break;
}
}
});
//据需可以继续做一些其他的增删改操作
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
zkClient.delete().forPath(path + "/c1");
Thread.sleep(1000);
zkClient.delete().forPath(path);
Thread.sleep(1000);
zkClient.close();
}
Curator Cache,是在zk3.6新版本添加的特性,Curator需5. *+
它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for* **来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener);注册监听。
public static void curatorCache1(CuratorFramework zkClient) {
final String path = "/curatorCache";
CuratorCache curatorCache = CuratorCache.build(zkClient, path);
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData newdata) {
switch (type) {
case NODE_CREATED:
//各种判断
break;
default:
break;
}
}
});
}
public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {
final String path = "/curatorCache";
CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();
//构建监听器
//新旧对照:
//1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );
//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();
//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();
CuratorCacheListener listener = CuratorCacheListener.builder()
.forNodeCache(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点改变了...");
}
})
.build();
//添加监听
curatorCache.listenable().addListener(listener);
//开启监听
curatorCache.start();
//让线程休眠30s(为了方便测试)
Thread.sleep(1000 * 30);
}
package org.example.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.testng.annotations.Test;
import java.util.List;
/**
* @ClassName: CuratorDao
* @Description:
* @Author: 88578
* @Date: 2022/5/1 14:17
*/
public class CuratorDao {
//使用curator(递归)添加节点
//级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)
public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {
zkClient.create()
.creatingParentContainersIfNeeded()//创建父节点,如果需要的话
.withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限
.forPath(nodePath, nodeData.getBytes());
System.out.println(nodePath + "节点已成功创建…");
}
//使用curator(递归)删除节点
//删除node节点及其子节点
public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {
zkClient.delete()
.guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功
.deletingChildrenIfNeeded() //级联删除子节点
//.withVersion(1)//版本号可以据需使用
.forPath(nodePath);
System.out.println(nodePath + "节点已删除成功…");
}
//使用curator更新节点数据
//更新节点data数据
public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {
zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带
System.out.println(nodePath + "节点数据已修改成功…");
}
//使用curator查询节点数据
//查询node节点数据
public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {
Stat stat = new Stat();
byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为" + new String(data));
System.out.println("节点的版本号为:" + stat.getVersion());
}
//使用curator查询节点的子节点
//打印node子节点
public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {
List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);
System.out.println("开始打印子节点");
for (String str : childNodes) {
System.out.println(str);
}
}
//使用curator判断节点是否存在
//判断node节点是否存在
public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {
Stat stat = zkClient.checkExists().forPath(nodePath);
System.out.println(null == stat ? "节点不存在" : "节点存在");
}
/**************使用Curator高级API特性之Cache缓存监控节点变化*************/
//一次性的watch
public static void watchOnce(CuratorFramework zkClient, String nodePath) throws Exception {
zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
}
//NodeCache监听当前节点变化
//通过NodeCacheListener接口持续监听节点的变化来实现
//持续监听的watch
public static void watchForeverByNodeCache(CuratorFramework zkClient, String nodePath) throws Exception {
final NodeCache nodeCache = new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCache
nodeCache.start(false);//默认为false 设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空
ChildData cacheData = nodeCache.getCurrentData();
if (null == cacheData) {
System.out.println("NodeCache节点的初始化数据为空……");
} else {
System.out.println("NodeCache节点的初始化数据为" + new String(cacheData.getData()));
}
//设置循环监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData cdata = nodeCache.getCurrentData();
if (null == cdata) {
System.out.println("节点发生了变化,可能刚刚被删除!");
nodeCache.close();//关闭监听
} else {
String data = new String(cdata.getData());
String path = nodeCache.getCurrentData().getPath();
System.out.println("节点路径" + path + "数据发生了变化,最新数据为:" + data);
}
}
});
}
//PathChildrenCache只监听子节点变化
//通过PathChildrenCacheListener接口持续监听子节点来实现
//持续监听watch子节点的任何变化
public static void watchForeverByPathChildrenCache(CuratorFramework zkClient, String nodePath) throws Exception {
final PathChildrenCache childrenCache = new PathChildrenCache(zkClient, nodePath, true);//把监听节点,转换为childrenCache
/**
* StartMode:初始化方式
* POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case)
* NORMAL:异步初始化 (不会进入下面的第一个case)
* BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中)
*/
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前节点所有子节点的数据列表如下:");
for (ChildData childData : childDataList) {
System.out.println(new String(childData.getData()));
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case INITIALIZED:
System.out.println("子节点初始化OK…");
break;
case CHILD_ADDED:
System.out.println("子节点" + event.getData().getPath() + "已被成功添加,数据data=" + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("子节点" + event.getData().getPath() + "数据发生变化,新数据data=" + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("子节点" + event.getData().getPath() + "已被移除~");
break;
case CONNECTION_RECONNECTED:
System.out.println("正在尝试重新建立连接…");
break;
case CONNECTION_SUSPENDED:
System.out.println("连接状态被暂时停止…");
break;
default:
break;
}
}
});
}
//TreeCache是上两者的合体,既监听自身,也监听所有子节点变化
//通过TreeCacheListener接口来实现
public static void treeCache(CuratorFramework zkClient, String nodePath) throws Exception {
// final String path = "/treeChildrenCache";
final TreeCache treeCache = new TreeCache(zkClient, nodePath);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath());
break;
case CONNECTION_LOST:
System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath());
break;
case CONNECTION_RECONNECTED:
System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath());
break;
case CONNECTION_SUSPENDED:
System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath());
break;
case INITIALIZED:
System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath());
break;
default:
break;
}
}
});
//据需可以继续做一些其他的增删改操作
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
Thread.sleep(1000);
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath + "/c1");
Thread.sleep(1000);
zkClient.delete().forPath(nodePath + "/c1");
Thread.sleep(1000);
zkClient.delete().forPath(nodePath);
Thread.sleep(1000);
zkClient.close();
}
/*Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+
它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),
它通过CuratorCacheListener.builder().for***来选择对应的监听。
最后再通过curatorCache.listenable().addListener(listener);注册监听。*/
public static void curatorCache1(CuratorFramework zkClient) {
final String path = "/curatorCache";
CuratorCache curatorCache = CuratorCache.build(zkClient, path);
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData newdata) {
switch (type) {
case NODE_CREATED:
//各种判断
break;
default:
break;
}
}
});
}
public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {
final String path = "/curatorCache";
CuratorCache curatorCache = CuratorCache.builder(zkClient, path).build();
//构建监听器
//新旧对照:
//1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );
//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();
//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();
CuratorCacheListener listener = CuratorCacheListener.builder()
.forNodeCache(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点改变了...");
}
})
.build();
//添加监听
curatorCache.listenable().addListener(listener);
//开启监听
curatorCache.start();
//让线程休眠30s(为了方便测试)
Thread.sleep(1000 * 30);
}
@Test
public void test() throws Exception {
ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动
CuratorFramework zkClient = zkUtil.zkClient;
// CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui");
// CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test");
// CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi");
// CuratorDao.getNodeData(zkClient,"/xiaosi/test");
// CuratorDao.printChildNodes(zkClient, "/xiaosi");
CuratorDao.checkNodeExists(zkClient, "/xiaosi");
}
public static void main(String[] args) throws Exception {
ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动
CuratorFramework zkClient = zkUtil.zkClient;
// CuratorDao.watchOnce(zkClient, "/xiaosi/test");
// CuratorDao.watchForeverByNodeCache(zkClient, "/xiaosi/test");
// CuratorDao.watchForeverByPathChildrenCache(zkClient, "/xiaosi/test");
CuratorDao.treeCache(zkClient, "/xiaosi/test4");
CuratorDao dao = new CuratorDao();
String nodePath = "/super/succ";
dao.createNodes(zkClient, nodePath, "super");//创建节点
// dao.updateNodeData(zkClient, nodePath, "hello");//更新节点数据
// dao.deleteNodeWithChild(zkClient, nodePath);
// dao.getNodeData(zkClient, nodePath);
// dao.printChildNodes(zkClient, nodePath);
// dao.checkNodeExists(zkClient, nodePath);
// dao.watchOnce(zkClient, nodePath);
// dao.watchForeverByNodeCache(zkClient, nodePath);
// dao.watchForeverByPathChildrenCache(zkClient, nodePath);
Thread.sleep(300000); //延迟sleep时间,便于后才修改节点,看前台是否会继续触发watch
cto.closeZKClient();
}
}
5.4 使用Curator创建/验证ACL(访问权限列表)
5.4.1 连通Zk时,就指定登录权限
//本类代码,只涉及ACL操作
public class CuratorAcl {
public CuratorFramework client = null;
public static final String workspace="workspace";
public static final String zkServerPath = "192.168.31.216:2181";
public CuratorAcl() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情况下,登录账号、密码可以通过构造参数传入,暂时固定,据需修改
.connectString(zkServerPath)
.sessionTimeoutMs(20000).retryPolicy(retryPolicy)
.namespace(workspace).build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
}
5.4.2写一个把明文的账号密码转换为加密后的密文的工具类
//把明文的账号密码转换为加密后的密文
public class AclUtils {
public static String getDigestUserPwd(String loginId_Username_Passwd) {
String digest = "";
try {
digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return digest;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {
String id = "mayun:mayun";
String idDigested = getDigestUserPwd(id);
System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=
}
}
5.4.3使用自定义工具类AclUtils,一次性给多个用户赋Acl权限
public static List<ACL> getAcls() throws NoSuchAlgorithmException{
List<ACL> acls=new ArrayList<ACL>();
Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));
Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));
acls.add(new ACL(Perms.ALL, mayun));//给mayun一次性赋值所有权限
acls.add(new ACL(Perms.READ, lilei));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//给lilei分两次赋权限(目的:看不同的赋权方式)
return acls;
}
5.4.4级联创建节点,并赋予节点操作权限
public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {
String result=cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls, true)//给节点赋权限
.forPath(nodePath, nodeData.getBytes());
System.out.println("创建成功,result="+result);
}
5.4.5读取节点数据
public void getNodeData(CuratorAcl cto,String nodePath) throws Exception {
Stat stat = new Stat();
byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
if(null!=stat) {
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());
}
}
5.4.6修改具有ACL权限节点的data数据
public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {
cto.getNodeData(cto, nodePath);
System.out.println("节点修改后的数据为:"+nodeNewData);
cto.client.setData().forPath(nodePath, nodeNewData.getBytes());
System.out.println("修改成功");
}
5.4.7两种方法判断node节点是否存(优先使用第一种)
public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {
cto.getNodeData(cto, nodePath);
System.out.println("-----------=================-------------");
//判断节点是否存在,方法一(路径前面会自动添加workspace)
Stat stat=cto.client.checkExists().forPath(nodePath);
System.out.println("======="+stat==null?"不存在":"存在");
//判断节点是否存在,方法二(路径前面需手动添加workspace)
Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);
System.out.println("======="+stat2==null?"不存在":"存在");
}
ACL权限的main方法测试
通过java代码给某个节点添加ACL权限后,后台登陆zk客户端时,是无法直接操作该节点被ACL控制的权限的操作的,要想操作具有ACL权限的节点,方法只有两个。
1、知道该节点输入用户都有哪些,用这些用户的账号密码登录
2、使用超级用户登录
#getAcl /succ/testDigest 查看都有哪些用户对该节点有操作权限
#addauth digest succ:succ 登录
public static void main(String[] args) throws Exception {
CuratorAcl cto = new CuratorAcl();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接成功" : "已关闭"));
String nodePath1 = "/acl/tom/bin";
String nodePath2 = "/acl/father/child/sub";
// cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls());//首次创建,报错,只能创建父节点,子节点无法创建
// cto.client.setACL().withACL(getAcls()).forPath("/curatorNode");//给节点创建权限
// cto.getNodeData(cto, "/super");
// cto.getNodeData(cto, "/acl");
cto.checkNodeExists(cto, nodePath2);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接成功" : "已关闭"));
}
7 分布式锁
Curator的5种分布式锁及其对应的核心类:
1.重入式排它锁 Shared Reentrant Lock,实现类:InterProcessMutex
2.不可重入排它锁 Shared Lock ,实现类:InterProcessSemaphoreMutex
3.可重入读写锁 Shared Reentrant Read Write Lock,实现类: InterProcessReadWriteLock 、InterProcessLock
4.多锁对象容器(多共享锁) Multi Shared Lock,将多个锁作为单个实体管理的容器,实现类:InterProcessMultiLock、InterProcessLock
5.共享信号锁Shared Semaphore ,实现类:InterProcessSemaphoreV2
跨 JVM 工作的计数信号量。使用相同锁路径的所有 JVM 中的所有进程将实现进程间有限的租用集。此外,这个信号量大多是“公平的”——每个用户将按照请求的顺序获得租用(从 ZK 的角度来看)。
有两种模式可用于确定信号量的最大租用。在第一种模式中,最大租用是由给定路径的用户维护的约定。在第二种模式中,SharedCountReader 用作给定路径的信号量的方法,以确定最大租用。
7.1.重入式排它锁InterProcessMutex
public InterProcessMutex(CuratorFramework client, String path)
获取/释放锁的API
public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
public void release() throws Exception;//释放锁
通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法
/**
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener
*/
public void makeRevocable(RevocationListener listener)
7.2.不可重入排它锁InterProcessSemaphoreMutex
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入
7.3.可重入读写锁InterProcessReadWriteLock 、InterProcessLock
一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。
7.4.多锁对象容器(多共享锁) ,将多个锁作为单个实体管理,InterProcessMultiLock、InterProcessLock
Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire(上锁),如果请求失败,所有的锁都会被release (释放锁)。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:InterProcessMultiLock、InterProcessLock
它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
public InterProcessMultiLock(List locks)
public InterProcessMultiLock(CuratorFramework client, List paths)
7.5.代码
public class ZkLock {
final static Logger log = LoggerFactory.getLogger(ZkLock.class);
public CuratorFramework zkClient = null; // zk的客户端工具Curator(在本类通过new实例化的是,自动start)
private static final int BASE_SLEEP_TIME_MS = 1000; // 连接失败后,再次重试的间隔时间 单位:毫秒
private static final int MAX_RETRY_TIMES = 10; // 定义失败重试次数
private static final int SESSION_TIME_OUT = 1000000; // 会话存活时间,根据业务灵活指定 单位:毫秒
private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服务所在的IP和客户端端口
private static final String NAMESPACE = "workspace";// 指定后,默认操作的所有的节点都会在该工作空间下进行
static int j = 10;
//初始化zk客户端
public ZkLock() {
// 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);
// 通过工厂建立连接
zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 连接地址
.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重试策略
.build();
zkClient.start();
}
public static void lockTest(CuratorFramework zkClient) throws InterruptedException {
// 使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的
final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {//启动10个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();// 线程等待一起执行
lock.acquire();// 分布式锁,数据同步
// 处理业务
j--;
System.out.println(j);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {// 释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "t" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();// 模拟十个线程一起并发.指定一起执行
}
public static void main(String[] args) throws InterruptedException {
ZkLock zkl = new ZkLock();
ZkLock.lockTest(zkl.zkClient);
}
}
8.分布式计数器
利用Zookeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围(int、long)不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。
increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值
add():增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
使用的时候,必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
public static void count(CuratorFramework zkClient) throws Exception {
//分布式计数器
DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100));
//初始化
counter.forceSet(0);
AtomicValue<Integer> value = counter.increment();//原子自增
System.out.println("原值为"+value.preValue());
System.out.println("更改后的值为"+value.postValue());
System.out.println("状态"+value.succeeded());
}
public static void main(String[] args) throws Exception {
ZkLock zkl=new ZkLock();
//ZkLock.lockTest(zkl.zkClient);
ZkLock.count(zkl.zkClient);
}
另外Curator还有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式队列DistributedQueueDistributed Queue