Kafka Streams — 流处理
什么是 Kafka Streams
Kafka Streams 是 Kafka 内置的轻量级流处理库,无需独立集群,直接作为普通 Java 库嵌入应用程序。它基于 Kafka 的消费者和生产者 API 构建,天然支持精确一次语义。
适用场景:
- 轻量级流处理,不想维护 Flink/Spark 集群
- 微服务架构中的实时数据处理
- Kafka 数据的实时转换、过滤、聚合
核心概念
| 概念 | 说明 |
|---|---|
| KStream | 无界的事件流,每条记录是独立事件 |
| KTable | 变更日志流,每条记录是对某个 Key 的最新值更新(类似数据库表) |
| GlobalKTable | 全量复制到每个实例的 KTable,用于广播维表 |
| Topology | 处理逻辑的有向无环图 |
| State Store | 本地状态存储(RocksDB),支持聚合和 Join |
基本使用
java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 构建 Topology
StreamsBuilder builder = new StreamsBuilder();
// 从 Kafka Topic 读取
KStream<String, String> source = builder.stream("user_events");
// 转换处理
KStream<String, Long> wordCounts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
.groupBy((key, word) -> word)
.count()
.toStream();
// 写入输出 Topic
wordCounts.to("word_counts",
Produced.with(Serdes.String(), Serdes.Long()));
// 启动
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));KStream 操作
java
KStream<String, String> stream = builder.stream("input");
// filter:过滤
stream.filter((key, value) -> value.contains("ERROR"))
// map:转换 key 和 value
stream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value.length()))
// mapValues:只转换 value
stream.mapValues(value -> value.toUpperCase())
// flatMap:一对多
stream.flatMap((key, value) ->
Arrays.stream(value.split(","))
.map(v -> KeyValue.pair(key, v))
.collect(Collectors.toList())
)
// selectKey:修改 key
stream.selectKey((key, value) -> extractUserId(value))
// branch:分流(按条件分成多个流)
KStream<String, String>[] branches = stream.branch(
(key, value) -> value.startsWith("ERROR"),
(key, value) -> value.startsWith("WARN"),
(key, value) -> true // 其他
);
KStream<String, String> errors = branches[0];
KStream<String, String> warnings = branches[1];
// merge:合并多个流
KStream<String, String> merged = stream1.merge(stream2);
// peek:调试用,不修改数据
stream.peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value))KTable 操作
java
// 从 Topic 创建 KTable(每个 key 保留最新值)
KTable<String, String> table = builder.table("user_profiles");
// KTable 过滤
table.filter((key, value) -> value != null && !value.isEmpty())
// KTable 转换
table.mapValues(value -> parseUser(value))
// KTable 转 KStream
KStream<String, String> stream = table.toStream();
// KStream 转 KTable(按 key 聚合最新值)
KTable<String, String> table2 = stream.toTable();聚合操作
java
// 按 key 计数
KTable<String, Long> counts = stream
.groupByKey()
.count();
// 按 key 求和
KTable<String, Long> sums = stream
.mapValues(v -> Long.parseLong(v))
.groupByKey()
.reduce(Long::sum);
// 自定义聚合
KTable<String, UserStats> stats = stream
.groupByKey()
.aggregate(
UserStats::new, // 初始化
(key, value, aggregate) -> aggregate.add(value), // 聚合逻辑
Materialized.with(Serdes.String(), userStatsSerde)
);
// 窗口聚合(每5分钟统计一次)
KTable<Windowed<String>, Long> windowedCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// 滑动窗口
KTable<Windowed<String>, Long> slidingCounts = stream
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
.count();
// 会话窗口
KTable<Windowed<String>, Long> sessionCounts = stream
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();Join 操作
java
// KStream-KStream Join(时间窗口内)
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");
KStream<String, String> joined = orders.join(
payments,
(order, payment) -> order + " paid with " + payment,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
// KStream-KTable Join(流表 Join,维表查询)
KTable<String, String> userProfiles = builder.table("user_profiles");
KStream<String, String> enriched = orders.join(
userProfiles,
(order, profile) -> enrichOrder(order, profile)
);
// KStream-GlobalKTable Join(广播维表,不需要 Repartition)
GlobalKTable<String, String> productCatalog = builder.globalTable("products");
KStream<String, String> enrichedOrders = orders.join(
productCatalog,
(key, order) -> extractProductId(order), // 提取 join key
(order, product) -> enrichWithProduct(order, product)
);
// KTable-KTable Join
KTable<String, String> result = table1.join(
table2,
(v1, v2) -> v1 + "," + v2
);状态存储与交互式查询
java
// 创建带名称的状态存储(支持查询)
KTable<String, Long> counts = stream
.groupByKey()
.count(Materialized.as("counts-store"));
// 查询本地状态(用于 REST API 等)
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"counts-store",
QueryableStoreTypes.keyValueStore()
));
// 点查
Long count = store.get("user_001");
// 范围查询
KeyValueIterator<String, Long> range = store.range("user_001", "user_100");
while (range.hasNext()) {
KeyValue<String, Long> kv = range.next();
System.out.println(kv.key + ": " + kv.value);
}
range.close();
// 全量查询
KeyValueIterator<String, Long> all = store.all();精确一次语义
java
// 开启精确一次(需要 Kafka 2.5+)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// 注意:精确一次会降低吞吐量,根据业务需求选择
// 至少一次(默认,性能更好)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE);监控与运维
java
// 监听状态变化
streams.setStateListener((newState, oldState) -> {
System.out.println("State changed from " + oldState + " to " + newState);
if (newState == KafkaStreams.State.ERROR) {
// 告警
}
});
// 设置未捕获异常处理
streams.setUncaughtExceptionHandler(exception -> {
System.err.println("Uncaught exception: " + exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
// 获取 Metrics
Map<MetricName, ? extends Metric> metrics = streams.metrics();Kafka Streams vs Flink
| 维度 | Kafka Streams | Flink |
|---|---|---|
| 部署 | 嵌入应用,无独立集群 | 需要独立集群 |
| 扩展性 | 通过增加实例扩展 | 动态扩缩容 |
| 延迟 | 毫秒级 | 毫秒级 |
| 状态管理 | RocksDB | RocksDB / 内存 |
| SQL 支持 | ksqlDB(独立) | Flink SQL(内置) |
| 学习曲线 | 低 | 中等 |
| 适用规模 | 中小规模 | 大规模 |
| 依赖 | 只需 Kafka | 需要 Flink 集群 |