Spark Streaming — 流处理
概述
Spark 提供两套流处理 API:
| API | 模型 | 延迟 | 推荐程度 |
|---|---|---|---|
| DStream(旧) | 微批,基于 RDD | 秒级 | 不推荐新项目使用 |
| Structured Streaming(新) | 微批/连续,基于 DataFrame | 毫秒~秒级 | ✅ 推荐 |
本文重点介绍 Structured Streaming。
Structured Streaming 核心概念
Structured Streaming 将流数据视为一张无界表(Unbounded Table),每批新数据追加到表的末尾,查询结果持续更新。
输入流(无界表):
时间 │ id │ value
──────┼────┼──────
t=1 │ 1 │ 10
t=2 │ 2 │ 20
t=3 │ 1 │ 30 ← 新数据不断追加
...
查询:SELECT id, SUM(value) FROM stream GROUP BY id
结果表(持续更新):
id │ sum
───┼────
1 │ 40
2 │ 20基本使用
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StructuredStreaming") \
.getOrCreate()
# 从 Kafka 读取流数据
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user_events") \
.option("startingOffsets", "latest") \
.load()
# 解析 Kafka 消息(value 是 bytes)
events = df.select(
col("timestamp"),
from_json(
col("value").cast("string"),
"user_id STRING, event_type STRING, amount DOUBLE"
).alias("data")
).select("timestamp", "data.*")
# 流式聚合
result = events \
.groupBy("event_type") \
.agg(
count("*").alias("cnt"),
sum("amount").alias("total_amount")
)
# 写入控制台(测试用)
query = result.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()输出模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
append | 只输出新增行,不更新旧行 | 无聚合操作,或带 Watermark 的聚合 |
complete | 每次输出完整结果表 | 有聚合操作(结果表不会太大) |
update | 只输出有变化的行 | 有聚合操作,减少输出量 |
水位线(Watermark)处理延迟数据
python
# 场景:事件可能延迟到达,需要处理乱序数据
events_with_watermark = events \
.withWatermark("event_time", "10 minutes") # 允许最多 10 分钟延迟
# 基于事件时间的窗口聚合
result = events_with_watermark \
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"), # 5分钟窗口,1分钟滑动
col("event_type")
) \
.agg(
count("*").alias("cnt"),
sum("amount").alias("total")
)
# 使用 append 模式(Watermark 后才输出,保证结果完整)
query = result.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/output/streaming/") \
.option("checkpointLocation", "/checkpoints/streaming/") \
.start()Watermark 工作原理:
事件时间轴:
─────────────────────────────────────────────────────►
t=10 t=12 t=15 t=18 t=20 t=25 t=30
当前最大事件时间 = 25
Watermark = 25 - 10分钟 = 15
t < 15 的延迟数据会被丢弃
t >= 15 的数据仍然被处理触发器(Trigger)
python
from pyspark.sql.streaming import Trigger
# 默认:尽快处理(有数据就处理)
.trigger(Trigger.Once()) # 只处理一次,处理完退出
# 固定间隔微批
.trigger(processingTime="30 seconds")
# 连续处理(毫秒级延迟,实验性)
.trigger(Trigger.Continuous("1 second"))
# 一次性处理所有可用数据(批量回填)
.trigger(availableNow=True)写入各种 Sink
python
# 写入 Kafka
query = result.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "/checkpoints/kafka/") \
.start()
# 写入 Delta Lake
query = result.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/delta/") \
.start("/data/delta/streaming_output/")
# 写入 JDBC(foreachBatch)
def write_to_mysql(batch_df, batch_id):
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/db") \
.option("dbtable", "streaming_result") \
.option("user", "root") \
.option("password", "password") \
.mode("append") \
.save()
query = result.writeStream \
.foreachBatch(write_to_mysql) \
.option("checkpointLocation", "/checkpoints/jdbc/") \
.start()
# 自定义 foreach
class MyWriter:
def open(self, partition_id, epoch_id):
# 建立连接
return True
def process(self, row):
# 处理每一行
pass
def close(self, error):
# 关闭连接
pass
query = result.writeStream \
.foreach(MyWriter()) \
.start()Checkpoint 与容错
Checkpoint 是 Structured Streaming 容错的核心机制:
python
# 必须设置 checkpointLocation
query = df.writeStream \
.option("checkpointLocation", "hdfs://namenode:9000/checkpoints/app1/") \
.start()
# Checkpoint 目录内容:
# checkpoints/app1/
# ├── commits/ ← 已提交的批次 ID
# ├── offsets/ ← 每批次的输入偏移量
# ├── metadata ← 流查询元数据
# └── sources/ ← 数据源状态精确一次语义:
- Kafka Source + 支持幂等写入的 Sink → 精确一次
- 使用
foreachBatch+ 幂等写入逻辑 → 精确一次
流流 Join
python
# 两个流之间的 Join(需要 Watermark)
impressions = spark.readStream.format("kafka") \
.option("subscribe", "impressions").load() \
.withWatermark("impression_time", "2 hours")
clicks = spark.readStream.format("kafka") \
.option("subscribe", "clicks").load() \
.withWatermark("click_time", "3 hours")
# 在时间范围内 Join
joined = impressions.join(
clicks,
expr("""
impression_id = click_impression_id AND
click_time >= impression_time AND
click_time <= impression_time + interval 1 hour
"""),
"leftOuter"
)监控流查询
python
# 获取查询状态
query.status
# {'message': 'Processing new data', 'isDataAvailable': True, ...}
# 获取最近进度
query.lastProgress
# {'id': '...', 'batchId': 5, 'numInputRows': 1000, 'processedRowsPerSecond': 500, ...}
# 等待终止
query.awaitTermination()
# 停止查询
query.stop()
# 监听所有流查询
for q in spark.streams.active:
print(q.name, q.status)