Kafka — 分布式消息队列
什么是 Kafka
Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,后捐献给 Apache。它以高吞吐、低延迟、持久化、可回放著称,是大数据生态中消息队列的事实标准。
核心能力:
- 消息队列:解耦生产者和消费者
- 流存储:消息持久化,支持历史数据回放
- 流处理:Kafka Streams 直接处理流数据
核心概念
┌─────────────────────────────────────────────────────────────┐
│ Kafka 集群 │
│ │
│ Topic: user_events │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Partition 0: [msg0][msg1][msg2][msg3]... │ │
│ │ Partition 1: [msg0][msg1][msg2]... │ │
│ │ Partition 2: [msg0][msg1][msg2][msg3][msg4]... │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ Broker 1 (Leader: P0, Follower: P1) │
│ Broker 2 (Leader: P1, Follower: P2) │
│ Broker 3 (Leader: P2, Follower: P0) │
└─────────────────────────────────────────────────────────────┘
Producer → 写入 Leader Partition
Consumer Group → 每个 Partition 只被组内一个 Consumer 消费| 概念 | 说明 |
|---|---|
| Topic | 消息的逻辑分类,类似数据库的表 |
| Partition | Topic 的物理分片,实现并行读写 |
| Offset | 消息在 Partition 中的唯一位置标识 |
| Broker | Kafka 服务节点 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费者组,组内消费者共同消费一个 Topic |
| Replica | 分区副本,保证高可用 |
| Leader/Follower | 每个分区有一个 Leader 负责读写,Follower 同步备份 |
消息存储原理
Partition 目录结构:
/kafka-logs/user_events-0/
├── 00000000000000000000.log ← 消息数据文件
├── 00000000000000000000.index ← 稀疏索引(offset → 文件位置)
├── 00000000000000000000.timeindex ← 时间索引
├── 00000000000001000000.log ← 下一个 Segment
└── ...Segment 机制:
- 每个 Partition 由多个 Segment 文件组成
- 默认每个 Segment 1GB 或 7 天滚动
- 消息追加写入,顺序 I/O,性能极高
消息保留策略:
- 按时间:默认保留 7 天(
log.retention.hours=168) - 按大小:超过指定大小删除旧 Segment
- 按 Key 压缩(Log Compaction):保留每个 Key 的最新值
生产者
Java 生产者
java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", 3); // 重试次数
props.put("retry.backoff.ms", 100); // 重试间隔
// 性能配置
props.put("batch.size", 16384); // 批次大小(16KB)
props.put("linger.ms", 5); // 等待时间(凑批)
props.put("buffer.memory", 33554432); // 缓冲区大小(32MB)
props.put("compression.type", "snappy"); // 压缩
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 异步发送(带回调)
ProducerRecord<String, String> record = new ProducerRecord<>(
"user_events", // topic
"user_001", // key(决定分区)
"{\"event\":\"click\",\"page\":\"home\"}" // value
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
// 同步发送
RecordMetadata metadata = producer.send(record).get();
// 事务性发送(精确一次)
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
producer.close();Python 生产者
python
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
acks='all',
retries=3,
compression_type='snappy'
)
# 发送消息
future = producer.send(
'user_events',
key='user_001',
value={'event': 'click', 'page': 'home', 'timestamp': 1704067200}
)
# 等待确认
record_metadata = future.get(timeout=10)
print(f"Partition: {record_metadata.partition}, Offset: {record_metadata.offset}")
producer.flush()
producer.close()消费者
Java 消费者
java
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Offset 提交策略
props.put("enable.auto.commit", "false"); // 手动提交(推荐)
props.put("auto.offset.reset", "earliest"); // 从最早开始消费
// 性能配置
props.put("max.poll.records", 500); // 每次 poll 最多拉取条数
props.put("fetch.min.bytes", 1024); // 最小拉取字节数
props.put("fetch.max.wait.ms", 500); // 最大等待时间
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user_events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 处理消息...
}
// 手动提交 offset(同步)
consumer.commitSync();
// 或异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception);
}
});
}
} finally {
consumer.close();
}手动分配分区
java
// 不使用 Consumer Group,手动指定分区
TopicPartition partition0 = new TopicPartition("user_events", 0);
consumer.assign(Arrays.asList(partition0));
// 从指定 offset 开始消费
consumer.seek(partition0, 1000);
// 从最早开始
consumer.seekToBeginning(Arrays.asList(partition0));
// 从最新开始
consumer.seekToEnd(Arrays.asList(partition0));常用命令
bash
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic user_events \
--partitions 6 \
--replication-factor 3
# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server kafka:9092
# 查看 Topic 详情
kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic user_events
# 修改分区数(只能增加)
kafka-topics.sh --alter --bootstrap-server kafka:9092 \
--topic user_events --partitions 12
# 删除 Topic
kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic user_events
# 生产消息(命令行测试)
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic user_events
# 消费消息(从头开始)
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic user_events --from-beginning
# 查看消费者组
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# 查看消费者组 Lag(消费延迟)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-consumer-group
# 重置 offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-consumer-group \
--topic user_events \
--reset-offsets --to-earliest --execute性能调优
生产者调优
properties
# 提高吞吐量
batch.size=65536 # 增大批次(64KB)
linger.ms=20 # 等待更多消息凑批
compression.type=lz4 # 压缩(lz4 速度最快)
buffer.memory=67108864 # 增大缓冲区(64MB)
# 提高可靠性
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=1 # 保证顺序
enable.idempotence=true # 幂等性(精确一次)消费者调优
properties
# 提高吞吐量
fetch.min.bytes=65536 # 最小拉取 64KB
fetch.max.bytes=52428800 # 最大拉取 50MB
max.poll.records=1000 # 每次拉取更多消息
# 避免 Rebalance
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000 # 处理时间不超过 5 分钟Broker 调优
properties
# 网络和 I/O
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# 日志
log.segment.bytes=1073741824 # 1GB
log.retention.hours=168 # 7天
log.cleanup.policy=delete
# 副本同步
num.replica.fetchers=4
replica.fetch.max.bytes=1048576