Skip to content

Kafka — 分布式消息队列

什么是 Kafka

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,后捐献给 Apache。它以高吞吐、低延迟、持久化、可回放著称,是大数据生态中消息队列的事实标准。

核心能力

  • 消息队列:解耦生产者和消费者
  • 流存储:消息持久化,支持历史数据回放
  • 流处理:Kafka Streams 直接处理流数据

核心概念

┌─────────────────────────────────────────────────────────────┐
│                      Kafka 集群                              │
│                                                             │
│  Topic: user_events                                         │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  Partition 0: [msg0][msg1][msg2][msg3]...            │  │
│  │  Partition 1: [msg0][msg1][msg2]...                  │  │
│  │  Partition 2: [msg0][msg1][msg2][msg3][msg4]...      │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
│  Broker 1 (Leader: P0, Follower: P1)                       │
│  Broker 2 (Leader: P1, Follower: P2)                       │
│  Broker 3 (Leader: P2, Follower: P0)                       │
└─────────────────────────────────────────────────────────────┘

Producer → 写入 Leader Partition
Consumer Group → 每个 Partition 只被组内一个 Consumer 消费
概念说明
Topic消息的逻辑分类,类似数据库的表
PartitionTopic 的物理分片,实现并行读写
Offset消息在 Partition 中的唯一位置标识
BrokerKafka 服务节点
Producer消息生产者
Consumer消息消费者
Consumer Group消费者组,组内消费者共同消费一个 Topic
Replica分区副本,保证高可用
Leader/Follower每个分区有一个 Leader 负责读写,Follower 同步备份

消息存储原理

Partition 目录结构:
/kafka-logs/user_events-0/
├── 00000000000000000000.log      ← 消息数据文件
├── 00000000000000000000.index    ← 稀疏索引(offset → 文件位置)
├── 00000000000000000000.timeindex ← 时间索引
├── 00000000000001000000.log      ← 下一个 Segment
└── ...

Segment 机制

  • 每个 Partition 由多个 Segment 文件组成
  • 默认每个 Segment 1GB 或 7 天滚动
  • 消息追加写入,顺序 I/O,性能极高

消息保留策略

  • 按时间:默认保留 7 天(log.retention.hours=168
  • 按大小:超过指定大小删除旧 Segment
  • 按 Key 压缩(Log Compaction):保留每个 Key 的最新值

生产者

Java 生产者

java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 可靠性配置
props.put("acks", "all");           // 等待所有副本确认
props.put("retries", 3);            // 重试次数
props.put("retry.backoff.ms", 100); // 重试间隔

// 性能配置
props.put("batch.size", 16384);          // 批次大小(16KB)
props.put("linger.ms", 5);               // 等待时间(凑批)
props.put("buffer.memory", 33554432);    // 缓冲区大小(32MB)
props.put("compression.type", "snappy"); // 压缩

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 异步发送(带回调)
ProducerRecord<String, String> record = new ProducerRecord<>(
    "user_events",    // topic
    "user_001",       // key(决定分区)
    "{\"event\":\"click\",\"page\":\"home\"}"  // value
);

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.printf("Sent to partition %d, offset %d%n",
            metadata.partition(), metadata.offset());
    } else {
        exception.printStackTrace();
    }
});

// 同步发送
RecordMetadata metadata = producer.send(record).get();

// 事务性发送(精确一次)
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

producer.close();

Python 生产者

python
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8'),
    acks='all',
    retries=3,
    compression_type='snappy'
)

# 发送消息
future = producer.send(
    'user_events',
    key='user_001',
    value={'event': 'click', 'page': 'home', 'timestamp': 1704067200}
)

# 等待确认
record_metadata = future.get(timeout=10)
print(f"Partition: {record_metadata.partition}, Offset: {record_metadata.offset}")

producer.flush()
producer.close()

消费者

Java 消费者

java
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Offset 提交策略
props.put("enable.auto.commit", "false");  // 手动提交(推荐)
props.put("auto.offset.reset", "earliest"); // 从最早开始消费

// 性能配置
props.put("max.poll.records", 500);         // 每次 poll 最多拉取条数
props.put("fetch.min.bytes", 1024);         // 最小拉取字节数
props.put("fetch.max.wait.ms", 500);        // 最大等待时间

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user_events"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                record.partition(), record.offset(), record.key(), record.value());

            // 处理消息...
        }

        // 手动提交 offset(同步)
        consumer.commitSync();

        // 或异步提交
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                System.err.println("Commit failed: " + exception);
            }
        });
    }
} finally {
    consumer.close();
}

手动分配分区

java
// 不使用 Consumer Group,手动指定分区
TopicPartition partition0 = new TopicPartition("user_events", 0);
consumer.assign(Arrays.asList(partition0));

// 从指定 offset 开始消费
consumer.seek(partition0, 1000);

// 从最早开始
consumer.seekToBeginning(Arrays.asList(partition0));

// 从最新开始
consumer.seekToEnd(Arrays.asList(partition0));

常用命令

bash
# 创建 Topic
kafka-topics.sh --create \
  --bootstrap-server kafka:9092 \
  --topic user_events \
  --partitions 6 \
  --replication-factor 3

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server kafka:9092

# 查看 Topic 详情
kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic user_events

# 修改分区数(只能增加)
kafka-topics.sh --alter --bootstrap-server kafka:9092 \
  --topic user_events --partitions 12

# 删除 Topic
kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic user_events

# 生产消息(命令行测试)
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic user_events

# 消费消息(从头开始)
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic user_events --from-beginning

# 查看消费者组
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

# 查看消费者组 Lag(消费延迟)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-consumer-group

# 重置 offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-consumer-group \
  --topic user_events \
  --reset-offsets --to-earliest --execute

性能调优

生产者调优

properties
# 提高吞吐量
batch.size=65536          # 增大批次(64KB)
linger.ms=20              # 等待更多消息凑批
compression.type=lz4      # 压缩(lz4 速度最快)
buffer.memory=67108864    # 增大缓冲区(64MB)

# 提高可靠性
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=1  # 保证顺序
enable.idempotence=true   # 幂等性(精确一次)

消费者调优

properties
# 提高吞吐量
fetch.min.bytes=65536     # 最小拉取 64KB
fetch.max.bytes=52428800  # 最大拉取 50MB
max.poll.records=1000     # 每次拉取更多消息

# 避免 Rebalance
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000  # 处理时间不超过 5 分钟

Broker 调优

properties
# 网络和 I/O
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# 日志
log.segment.bytes=1073741824  # 1GB
log.retention.hours=168       # 7天
log.cleanup.policy=delete

# 副本同步
num.replica.fetchers=4
replica.fetch.max.bytes=1048576

本站内容由 褚成志 整理编写,仅供学习参考