Zookeeper — 分布式协调服务
什么是 Zookeeper
Apache Zookeeper 是一个分布式协调服务,为分布式应用提供一致性服务,包括:配置管理、命名服务、分布式锁、集群选主等。它是 Hadoop、Kafka、HBase 等大数据组件的基础依赖。
核心特性:
- 强一致性(顺序一致性)
- 高可用(奇数节点集群,过半存活即可服务)
- 数据模型简单(类文件系统的树形结构)
- Watch 机制(数据变化时通知客户端)
数据模型
Zookeeper 的数据存储在内存中,以树形结构(ZNode 树)组织:
/
├── /hadoop
│ ├── /hadoop/namenode ← 存储 NameNode 地址
│ └── /hadoop/datanodes
│ ├── /hadoop/datanodes/dn1
│ └── /hadoop/datanodes/dn2
├── /kafka
│ ├── /kafka/brokers
│ │ ├── /kafka/brokers/ids
│ │ │ ├── /kafka/brokers/ids/1 ← Broker 1 信息
│ │ │ └── /kafka/brokers/ids/2
│ │ └── /kafka/brokers/topics
│ └── /kafka/controller ← 当前 Controller Broker
└── /hbase
└── /hbase/master ← HMaster 地址ZNode 类型
| 类型 | 说明 |
|---|---|
| 持久节点(Persistent) | 默认类型,客户端断开后节点依然存在 |
| 临时节点(Ephemeral) | 客户端断开后自动删除,用于服务注册、选主 |
| 持久顺序节点(Persistent Sequential) | 节点名自动追加递增序号 |
| 临时顺序节点(Ephemeral Sequential) | 临时 + 顺序,用于分布式锁 |
核心机制
Watch 机制
客户端可以在 ZNode 上注册 Watch,当节点发生变化时收到通知:
客户端 A 监听 /kafka/controller
│
▼
/kafka/controller 节点被删除(Controller 宕机)
│
▼
Zookeeper 通知客户端 A(一次性通知)
│
▼
客户端 A 重新注册 Watch,参与新一轮选主注意:Watch 是一次性的,触发后需要重新注册。
ZAB 协议(Zookeeper Atomic Broadcast)
ZAB 是 Zookeeper 的核心一致性协议,类似 Raft:
Leader 选举:
1. 所有节点投票给自己
2. 比较 zxid(事务 ID),zxid 大的优先
3. zxid 相同则比较 myid,myid 大的优先
4. 获得过半票数的节点成为 Leader
数据同步:
1. 客户端写请求发给任意节点
2. Follower 转发给 Leader
3. Leader 生成 Proposal,广播给所有 Follower
4. 过半 Follower 确认后,Leader 提交
5. Leader 通知所有 Follower 提交Java API 使用
java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
// 创建连接
ZooKeeper zk = new ZooKeeper(
"zk1:2181,zk2:2181,zk3:2181",
3000, // session timeout
event -> {
// 全局 Watcher
System.out.println("Event: " + event.getType() + " " + event.getPath());
}
);
// 创建节点
String path = zk.create(
"/myapp/config",
"value".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
// 读取节点
Stat stat = new Stat();
byte[] data = zk.getData("/myapp/config", false, stat);
System.out.println("Data: " + new String(data));
System.out.println("Version: " + stat.getVersion());
// 更新节点(带版本号,乐观锁)
zk.setData("/myapp/config", "new_value".getBytes(), stat.getVersion());
// 删除节点
zk.delete("/myapp/config", -1); // -1 表示不检查版本
// 检查节点是否存在(带 Watch)
Stat exists = zk.exists("/myapp/config", event -> {
System.out.println("Node changed: " + event.getType());
});
// 获取子节点列表
List<String> children = zk.getChildren("/myapp", false);
zk.close();Curator 框架(推荐)
Curator 是 Netflix 开源的 Zookeeper 客户端封装,简化了复杂操作:
java
import org.apache.curator.framework.*;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.framework.recipes.leader.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("zk1:2181,zk2:2181,zk3:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("myapp") // 所有操作都在 /myapp 下
.build();
client.start();
// CRUD 操作
// 创建(自动创建父节点)
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/config/db", "jdbc:mysql://...".getBytes());
// 读取
byte[] data = client.getData().forPath("/config/db");
// 更新
client.setData().forPath("/config/db", "new_value".getBytes());
// 删除(递归删除)
client.delete().deletingChildrenIfNeeded().forPath("/config");
// 监听节点变化(NodeCache)
NodeCache nodeCache = new NodeCache(client, "/config/db");
nodeCache.getListenable().addListener(() -> {
ChildData currentData = nodeCache.getCurrentData();
System.out.println("Node changed: " + new String(currentData.getData()));
});
nodeCache.start();
// 监听子节点变化(PathChildrenCache)
PathChildrenCache pathCache = new PathChildrenCache(client, "/config", true);
pathCache.getListenable().addListener((c, event) -> {
System.out.println("Event: " + event.getType() + " " + event.getData().getPath());
});
pathCache.start();分布式锁
java
// 可重入互斥锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/order-lock");
try {
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
// 临界区代码
processOrder();
} finally {
lock.release();
}
}
} catch (Exception e) {
e.printStackTrace();
}
// 读写锁
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/locks/rw-lock");
// 读锁(多个读可以并发)
InterProcessMutex readLock = rwLock.readLock();
readLock.acquire();
try {
readData();
} finally {
readLock.release();
}
// 写锁(独占)
InterProcessMutex writeLock = rwLock.writeLock();
writeLock.acquire();
try {
writeData();
} finally {
writeLock.release();
}Leader 选举
java
LeaderSelector leaderSelector = new LeaderSelector(
client,
"/leader-election",
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("I am the leader now!");
// 执行 Leader 逻辑
// 此方法返回后,自动放弃 Leader
Thread.sleep(Long.MAX_VALUE);
}
}
);
leaderSelector.autoRequeue(); // 放弃 Leader 后自动重新参与选举
leaderSelector.start();常用命令
bash
# 连接 Zookeeper
zkCli.sh -server zk1:2181,zk2:2181,zk3:2181
# 查看节点
ls /
ls /kafka/brokers/ids
# 查看节点数据
get /kafka/controller
# 创建节点
create /myapp "hello"
create -e /myapp/temp "ephemeral" # 临时节点
create -s /myapp/seq "sequential" # 顺序节点
# 更新节点
set /myapp "new_value"
# 删除节点
delete /myapp
deleteall /myapp # 递归删除
# 查看节点状态
stat /myapp
# 查看集群状态
echo stat | nc zk1 2181
echo mntr | nc zk1 2181集群配置
properties
# zoo.cfg
tickTime=2000 # 基本时间单位(毫秒)
initLimit=10 # Follower 初始化连接 Leader 的超时(10 * tickTime)
syncLimit=5 # Follower 与 Leader 同步的超时(5 * tickTime)
dataDir=/data/zookeeper
clientPort=2181
# 集群节点配置(server.id=host:leader_port:election_port)
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
# 每个节点的 myid 文件
# echo 1 > /data/zookeeper/myid (在 zk1 上)
# echo 2 > /data/zookeeper/myid (在 zk2 上)
# echo 3 > /data/zookeeper/myid (在 zk3 上)节点数量
Zookeeper 集群节点数必须是奇数(3、5、7),过半节点存活才能提供服务。3 节点可容忍 1 个故障,5 节点可容忍 2 个故障。