Skip to content

Kafka Streams — 流处理

什么是 Kafka Streams

Kafka Streams 是 Kafka 内置的轻量级流处理库,无需独立集群,直接作为普通 Java 库嵌入应用程序。它基于 Kafka 的消费者和生产者 API 构建,天然支持精确一次语义。

适用场景

  • 轻量级流处理,不想维护 Flink/Spark 集群
  • 微服务架构中的实时数据处理
  • Kafka 数据的实时转换、过滤、聚合

核心概念

概念说明
KStream无界的事件流,每条记录是独立事件
KTable变更日志流,每条记录是对某个 Key 的最新值更新(类似数据库表)
GlobalKTable全量复制到每个实例的 KTable,用于广播维表
Topology处理逻辑的有向无环图
State Store本地状态存储(RocksDB),支持聚合和 Join

基本使用

java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建 Topology
StreamsBuilder builder = new StreamsBuilder();

// 从 Kafka Topic 读取
KStream<String, String> source = builder.stream("user_events");

// 转换处理
KStream<String, Long> wordCounts = source
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
    .groupBy((key, word) -> word)
    .count()
    .toStream();

// 写入输出 Topic
wordCounts.to("word_counts",
    Produced.with(Serdes.String(), Serdes.Long()));

// 启动
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

KStream 操作

java
KStream<String, String> stream = builder.stream("input");

// filter:过滤
stream.filter((key, value) -> value.contains("ERROR"))

// map:转换 key 和 value
stream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value.length()))

// mapValues:只转换 value
stream.mapValues(value -> value.toUpperCase())

// flatMap:一对多
stream.flatMap((key, value) ->
    Arrays.stream(value.split(","))
          .map(v -> KeyValue.pair(key, v))
          .collect(Collectors.toList())
)

// selectKey:修改 key
stream.selectKey((key, value) -> extractUserId(value))

// branch:分流(按条件分成多个流)
KStream<String, String>[] branches = stream.branch(
    (key, value) -> value.startsWith("ERROR"),
    (key, value) -> value.startsWith("WARN"),
    (key, value) -> true  // 其他
);
KStream<String, String> errors = branches[0];
KStream<String, String> warnings = branches[1];

// merge:合并多个流
KStream<String, String> merged = stream1.merge(stream2);

// peek:调试用,不修改数据
stream.peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value))

KTable 操作

java
// 从 Topic 创建 KTable(每个 key 保留最新值)
KTable<String, String> table = builder.table("user_profiles");

// KTable 过滤
table.filter((key, value) -> value != null && !value.isEmpty())

// KTable 转换
table.mapValues(value -> parseUser(value))

// KTable 转 KStream
KStream<String, String> stream = table.toStream();

// KStream 转 KTable(按 key 聚合最新值)
KTable<String, String> table2 = stream.toTable();

聚合操作

java
// 按 key 计数
KTable<String, Long> counts = stream
    .groupByKey()
    .count();

// 按 key 求和
KTable<String, Long> sums = stream
    .mapValues(v -> Long.parseLong(v))
    .groupByKey()
    .reduce(Long::sum);

// 自定义聚合
KTable<String, UserStats> stats = stream
    .groupByKey()
    .aggregate(
        UserStats::new,  // 初始化
        (key, value, aggregate) -> aggregate.add(value),  // 聚合逻辑
        Materialized.with(Serdes.String(), userStatsSerde)
    );

// 窗口聚合(每5分钟统计一次)
KTable<Windowed<String>, Long> windowedCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// 滑动窗口
KTable<Windowed<String>, Long> slidingCounts = stream
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
    .count();

// 会话窗口
KTable<Windowed<String>, Long> sessionCounts = stream
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

Join 操作

java
// KStream-KStream Join(时间窗口内)
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");

KStream<String, String> joined = orders.join(
    payments,
    (order, payment) -> order + " paid with " + payment,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

// KStream-KTable Join(流表 Join,维表查询)
KTable<String, String> userProfiles = builder.table("user_profiles");

KStream<String, String> enriched = orders.join(
    userProfiles,
    (order, profile) -> enrichOrder(order, profile)
);

// KStream-GlobalKTable Join(广播维表,不需要 Repartition)
GlobalKTable<String, String> productCatalog = builder.globalTable("products");

KStream<String, String> enrichedOrders = orders.join(
    productCatalog,
    (key, order) -> extractProductId(order),  // 提取 join key
    (order, product) -> enrichWithProduct(order, product)
);

// KTable-KTable Join
KTable<String, String> result = table1.join(
    table2,
    (v1, v2) -> v1 + "," + v2
);

状态存储与交互式查询

java
// 创建带名称的状态存储(支持查询)
KTable<String, Long> counts = stream
    .groupByKey()
    .count(Materialized.as("counts-store"));

// 查询本地状态(用于 REST API 等)
ReadOnlyKeyValueStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "counts-store",
        QueryableStoreTypes.keyValueStore()
    ));

// 点查
Long count = store.get("user_001");

// 范围查询
KeyValueIterator<String, Long> range = store.range("user_001", "user_100");
while (range.hasNext()) {
    KeyValue<String, Long> kv = range.next();
    System.out.println(kv.key + ": " + kv.value);
}
range.close();

// 全量查询
KeyValueIterator<String, Long> all = store.all();

精确一次语义

java
// 开启精确一次(需要 Kafka 2.5+)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.EXACTLY_ONCE_V2);

// 注意:精确一次会降低吞吐量,根据业务需求选择
// 至少一次(默认,性能更好)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
          StreamsConfig.AT_LEAST_ONCE);

监控与运维

java
// 监听状态变化
streams.setStateListener((newState, oldState) -> {
    System.out.println("State changed from " + oldState + " to " + newState);
    if (newState == KafkaStreams.State.ERROR) {
        // 告警
    }
});

// 设置未捕获异常处理
streams.setUncaughtExceptionHandler(exception -> {
    System.err.println("Uncaught exception: " + exception);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

// 获取 Metrics
Map<MetricName, ? extends Metric> metrics = streams.metrics();

维度Kafka StreamsFlink
部署嵌入应用,无独立集群需要独立集群
扩展性通过增加实例扩展动态扩缩容
延迟毫秒级毫秒级
状态管理RocksDBRocksDB / 内存
SQL 支持ksqlDB(独立)Flink SQL(内置)
学习曲线中等
适用规模中小规模大规模
依赖只需 Kafka需要 Flink 集群

本站内容由 褚成志 整理编写,仅供学习参考