Skip to content

Spark Streaming — 流处理

概述

Spark 提供两套流处理 API:

API模型延迟推荐程度
DStream(旧)微批,基于 RDD秒级不推荐新项目使用
Structured Streaming(新)微批/连续,基于 DataFrame毫秒~秒级✅ 推荐

本文重点介绍 Structured Streaming


Structured Streaming 核心概念

Structured Streaming 将流数据视为一张无界表(Unbounded Table),每批新数据追加到表的末尾,查询结果持续更新。

输入流(无界表):
时间  │ id │ value
──────┼────┼──────
t=1   │  1 │  10
t=2   │  2 │  20
t=3   │  1 │  30   ← 新数据不断追加
...

查询:SELECT id, SUM(value) FROM stream GROUP BY id

结果表(持续更新):
id │ sum
───┼────
 1 │  40
 2 │  20

基本使用

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

# 从 Kafka 读取流数据
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user_events") \
    .option("startingOffsets", "latest") \
    .load()

# 解析 Kafka 消息(value 是 bytes)
events = df.select(
    col("timestamp"),
    from_json(
        col("value").cast("string"),
        "user_id STRING, event_type STRING, amount DOUBLE"
    ).alias("data")
).select("timestamp", "data.*")

# 流式聚合
result = events \
    .groupBy("event_type") \
    .agg(
        count("*").alias("cnt"),
        sum("amount").alias("total_amount")
    )

# 写入控制台(测试用)
query = result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()

输出模式

模式说明适用场景
append只输出新增行,不更新旧行无聚合操作,或带 Watermark 的聚合
complete每次输出完整结果表有聚合操作(结果表不会太大)
update只输出有变化的行有聚合操作,减少输出量

水位线(Watermark)处理延迟数据

python
# 场景:事件可能延迟到达,需要处理乱序数据

events_with_watermark = events \
    .withWatermark("event_time", "10 minutes")  # 允许最多 10 分钟延迟

# 基于事件时间的窗口聚合
result = events_with_watermark \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute"),  # 5分钟窗口,1分钟滑动
        col("event_type")
    ) \
    .agg(
        count("*").alias("cnt"),
        sum("amount").alias("total")
    )

# 使用 append 模式(Watermark 后才输出,保证结果完整)
query = result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/output/streaming/") \
    .option("checkpointLocation", "/checkpoints/streaming/") \
    .start()

Watermark 工作原理

事件时间轴:
─────────────────────────────────────────────────────►
  t=10  t=12  t=15  t=18  t=20  t=25  t=30

当前最大事件时间 = 25
Watermark = 25 - 10分钟 = 15

t < 15 的延迟数据会被丢弃
t >= 15 的数据仍然被处理

触发器(Trigger)

python
from pyspark.sql.streaming import Trigger

# 默认:尽快处理(有数据就处理)
.trigger(Trigger.Once())  # 只处理一次,处理完退出

# 固定间隔微批
.trigger(processingTime="30 seconds")

# 连续处理(毫秒级延迟,实验性)
.trigger(Trigger.Continuous("1 second"))

# 一次性处理所有可用数据(批量回填)
.trigger(availableNow=True)

写入各种 Sink

python
# 写入 Kafka
query = result.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "output_topic") \
    .option("checkpointLocation", "/checkpoints/kafka/") \
    .start()

# 写入 Delta Lake
query = result.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/delta/") \
    .start("/data/delta/streaming_output/")

# 写入 JDBC(foreachBatch)
def write_to_mysql(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/db") \
        .option("dbtable", "streaming_result") \
        .option("user", "root") \
        .option("password", "password") \
        .mode("append") \
        .save()

query = result.writeStream \
    .foreachBatch(write_to_mysql) \
    .option("checkpointLocation", "/checkpoints/jdbc/") \
    .start()

# 自定义 foreach
class MyWriter:
    def open(self, partition_id, epoch_id):
        # 建立连接
        return True

    def process(self, row):
        # 处理每一行
        pass

    def close(self, error):
        # 关闭连接
        pass

query = result.writeStream \
    .foreach(MyWriter()) \
    .start()

Checkpoint 与容错

Checkpoint 是 Structured Streaming 容错的核心机制:

python
# 必须设置 checkpointLocation
query = df.writeStream \
    .option("checkpointLocation", "hdfs://namenode:9000/checkpoints/app1/") \
    .start()

# Checkpoint 目录内容:
# checkpoints/app1/
# ├── commits/          ← 已提交的批次 ID
# ├── offsets/          ← 每批次的输入偏移量
# ├── metadata          ← 流查询元数据
# └── sources/          ← 数据源状态

精确一次语义

  • Kafka Source + 支持幂等写入的 Sink → 精确一次
  • 使用 foreachBatch + 幂等写入逻辑 → 精确一次

流流 Join

python
# 两个流之间的 Join(需要 Watermark)
impressions = spark.readStream.format("kafka") \
    .option("subscribe", "impressions").load() \
    .withWatermark("impression_time", "2 hours")

clicks = spark.readStream.format("kafka") \
    .option("subscribe", "clicks").load() \
    .withWatermark("click_time", "3 hours")

# 在时间范围内 Join
joined = impressions.join(
    clicks,
    expr("""
        impression_id = click_impression_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """),
    "leftOuter"
)

监控流查询

python
# 获取查询状态
query.status
# {'message': 'Processing new data', 'isDataAvailable': True, ...}

# 获取最近进度
query.lastProgress
# {'id': '...', 'batchId': 5, 'numInputRows': 1000, 'processedRowsPerSecond': 500, ...}

# 等待终止
query.awaitTermination()

# 停止查询
query.stop()

# 监听所有流查询
for q in spark.streams.active:
    print(q.name, q.status)

本站内容由 褚成志 整理编写,仅供学习参考