Skip to content

Zookeeper — 分布式协调服务

什么是 Zookeeper

Apache Zookeeper 是一个分布式协调服务,为分布式应用提供一致性服务,包括:配置管理、命名服务、分布式锁、集群选主等。它是 Hadoop、Kafka、HBase 等大数据组件的基础依赖。

核心特性

  • 强一致性(顺序一致性)
  • 高可用(奇数节点集群,过半存活即可服务)
  • 数据模型简单(类文件系统的树形结构)
  • Watch 机制(数据变化时通知客户端)

数据模型

Zookeeper 的数据存储在内存中,以树形结构(ZNode 树)组织:

/
├── /hadoop
│   ├── /hadoop/namenode        ← 存储 NameNode 地址
│   └── /hadoop/datanodes
│       ├── /hadoop/datanodes/dn1
│       └── /hadoop/datanodes/dn2
├── /kafka
│   ├── /kafka/brokers
│   │   ├── /kafka/brokers/ids
│   │   │   ├── /kafka/brokers/ids/1  ← Broker 1 信息
│   │   │   └── /kafka/brokers/ids/2
│   │   └── /kafka/brokers/topics
│   └── /kafka/controller          ← 当前 Controller Broker
└── /hbase
    └── /hbase/master              ← HMaster 地址

ZNode 类型

类型说明
持久节点(Persistent)默认类型,客户端断开后节点依然存在
临时节点(Ephemeral)客户端断开后自动删除,用于服务注册、选主
持久顺序节点(Persistent Sequential)节点名自动追加递增序号
临时顺序节点(Ephemeral Sequential)临时 + 顺序,用于分布式锁

核心机制

Watch 机制

客户端可以在 ZNode 上注册 Watch,当节点发生变化时收到通知:

客户端 A 监听 /kafka/controller


/kafka/controller 节点被删除(Controller 宕机)


Zookeeper 通知客户端 A(一次性通知)


客户端 A 重新注册 Watch,参与新一轮选主

注意:Watch 是一次性的,触发后需要重新注册。

ZAB 协议(Zookeeper Atomic Broadcast)

ZAB 是 Zookeeper 的核心一致性协议,类似 Raft:

Leader 选举:
  1. 所有节点投票给自己
  2. 比较 zxid(事务 ID),zxid 大的优先
  3. zxid 相同则比较 myid,myid 大的优先
  4. 获得过半票数的节点成为 Leader

数据同步:
  1. 客户端写请求发给任意节点
  2. Follower 转发给 Leader
  3. Leader 生成 Proposal,广播给所有 Follower
  4. 过半 Follower 确认后,Leader 提交
  5. Leader 通知所有 Follower 提交

Java API 使用

java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

// 创建连接
ZooKeeper zk = new ZooKeeper(
    "zk1:2181,zk2:2181,zk3:2181",
    3000,  // session timeout
    event -> {
        // 全局 Watcher
        System.out.println("Event: " + event.getType() + " " + event.getPath());
    }
);

// 创建节点
String path = zk.create(
    "/myapp/config",
    "value".getBytes(),
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT
);

// 读取节点
Stat stat = new Stat();
byte[] data = zk.getData("/myapp/config", false, stat);
System.out.println("Data: " + new String(data));
System.out.println("Version: " + stat.getVersion());

// 更新节点(带版本号,乐观锁)
zk.setData("/myapp/config", "new_value".getBytes(), stat.getVersion());

// 删除节点
zk.delete("/myapp/config", -1);  // -1 表示不检查版本

// 检查节点是否存在(带 Watch)
Stat exists = zk.exists("/myapp/config", event -> {
    System.out.println("Node changed: " + event.getType());
});

// 获取子节点列表
List<String> children = zk.getChildren("/myapp", false);

zk.close();

Curator 框架(推荐)

Curator 是 Netflix 开源的 Zookeeper 客户端封装,简化了复杂操作:

java
import org.apache.curator.framework.*;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.framework.recipes.leader.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("zk1:2181,zk2:2181,zk3:2181")
    .sessionTimeoutMs(5000)
    .connectionTimeoutMs(3000)
    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    .namespace("myapp")  // 所有操作都在 /myapp 下
    .build();

client.start();

// CRUD 操作
// 创建(自动创建父节点)
client.create().creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .forPath("/config/db", "jdbc:mysql://...".getBytes());

// 读取
byte[] data = client.getData().forPath("/config/db");

// 更新
client.setData().forPath("/config/db", "new_value".getBytes());

// 删除(递归删除)
client.delete().deletingChildrenIfNeeded().forPath("/config");

// 监听节点变化(NodeCache)
NodeCache nodeCache = new NodeCache(client, "/config/db");
nodeCache.getListenable().addListener(() -> {
    ChildData currentData = nodeCache.getCurrentData();
    System.out.println("Node changed: " + new String(currentData.getData()));
});
nodeCache.start();

// 监听子节点变化(PathChildrenCache)
PathChildrenCache pathCache = new PathChildrenCache(client, "/config", true);
pathCache.getListenable().addListener((c, event) -> {
    System.out.println("Event: " + event.getType() + " " + event.getData().getPath());
});
pathCache.start();

分布式锁

java
// 可重入互斥锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/order-lock");

try {
    if (lock.acquire(10, TimeUnit.SECONDS)) {
        try {
            // 临界区代码
            processOrder();
        } finally {
            lock.release();
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

// 读写锁
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/locks/rw-lock");

// 读锁(多个读可以并发)
InterProcessMutex readLock = rwLock.readLock();
readLock.acquire();
try {
    readData();
} finally {
    readLock.release();
}

// 写锁(独占)
InterProcessMutex writeLock = rwLock.writeLock();
writeLock.acquire();
try {
    writeData();
} finally {
    writeLock.release();
}

Leader 选举

java
LeaderSelector leaderSelector = new LeaderSelector(
    client,
    "/leader-election",
    new LeaderSelectorListenerAdapter() {
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            System.out.println("I am the leader now!");
            // 执行 Leader 逻辑
            // 此方法返回后,自动放弃 Leader
            Thread.sleep(Long.MAX_VALUE);
        }
    }
);

leaderSelector.autoRequeue();  // 放弃 Leader 后自动重新参与选举
leaderSelector.start();

常用命令

bash
# 连接 Zookeeper
zkCli.sh -server zk1:2181,zk2:2181,zk3:2181

# 查看节点
ls /
ls /kafka/brokers/ids

# 查看节点数据
get /kafka/controller

# 创建节点
create /myapp "hello"
create -e /myapp/temp "ephemeral"  # 临时节点
create -s /myapp/seq "sequential"  # 顺序节点

# 更新节点
set /myapp "new_value"

# 删除节点
delete /myapp
deleteall /myapp  # 递归删除

# 查看节点状态
stat /myapp

# 查看集群状态
echo stat | nc zk1 2181
echo mntr | nc zk1 2181

集群配置

properties
# zoo.cfg
tickTime=2000          # 基本时间单位(毫秒)
initLimit=10           # Follower 初始化连接 Leader 的超时(10 * tickTime)
syncLimit=5            # Follower 与 Leader 同步的超时(5 * tickTime)
dataDir=/data/zookeeper
clientPort=2181

# 集群节点配置(server.id=host:leader_port:election_port)
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

# 每个节点的 myid 文件
# echo 1 > /data/zookeeper/myid  (在 zk1 上)
# echo 2 > /data/zookeeper/myid  (在 zk2 上)
# echo 3 > /data/zookeeper/myid  (在 zk3 上)

节点数量

Zookeeper 集群节点数必须是奇数(3、5、7),过半节点存活才能提供服务。3 节点可容忍 1 个故障,5 节点可容忍 2 个故障。

本站内容由 褚成志 整理编写,仅供学习参考