Flink Core — 流批一体
什么是 Flink
Apache Flink 是一个真正的流处理框架,以流处理为核心,批处理是流处理的特例(有界流)。相比 Spark Streaming 的微批模式,Flink 实现了真正的逐条处理,延迟可达毫秒级。
核心优势:
- 真流处理:逐条处理,毫秒级延迟
- 精确一次语义(Exactly-Once)
- 强大的状态管理(State Backend)
- 事件时间处理(Event Time)
- 流批一体 API
架构设计
┌─────────────────────────────────────────────────────────────┐
│ Flink 集群架构 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ JobManager │ │
│ │ - 接收 Job,生成执行图 │ │
│ │ - 调度 Task 到 TaskManager │ │
│ │ - 协调 Checkpoint │ │
│ │ - 故障恢复 │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ TaskManager 集群 │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ TaskManager 1 │ │ TaskManager 2 │ │ │
│ │ │ Task Slot │ │ Task Slot │ │ │
│ │ │ [State] │ │ [State] │ │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘DataStream API
基本使用
java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.*;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 从 Kafka 读取
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>(
"user_events",
new SimpleStringSchema(),
kafkaProps
)
);
// 转换
DataStream<Tuple2<String, Integer>> result = stream
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(t -> t.f0)
.sum(1);
// 输出
result.addSink(new FlinkKafkaProducer<>(...));
// 执行
env.execute("WordCount");Python DataStream API
python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 从 Kafka 读取
ds = env.add_source(
FlinkKafkaConsumer(
topics="user_events",
deserialization_schema=SimpleStringSchema(),
properties={"bootstrap.servers": "kafka:9092", "group.id": "flink-group"}
)
)
# 转换
result = ds \
.flat_map(lambda line: [(word, 1) for word in line.split()]) \
.key_by(lambda t: t[0]) \
.sum(1)
result.print()
env.execute("WordCount")时间语义
Flink 支持三种时间语义:
事件时间(Event Time):事件实际发生的时间(推荐)
优点:结果准确,不受处理延迟影响
缺点:需要处理乱序数据
处理时间(Processing Time):数据被处理时的系统时间
优点:简单,无需 Watermark
缺点:结果不确定,受处理速度影响
摄入时间(Ingestion Time):数据进入 Flink 的时间
介于两者之间,较少使用java
// 设置时间语义(Flink 1.12+ 默认事件时间)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 提取事件时间 + 生成 Watermark
DataStream<Event> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 允许5秒乱序
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);窗口操作
窗口类型
java
// 滚动窗口(Tumbling Window):固定大小,不重叠
stream.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// 滑动窗口(Sliding Window):固定大小,有重叠
stream.keyBy(e -> e.userId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("amount");
// 会话窗口(Session Window):按活跃间隔划分
stream.keyBy(e -> e.userId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.sum("amount");
// 全局窗口(Global Window):自定义触发器
stream.keyBy(e -> e.userId)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100)) // 每100条触发
.sum("amount");窗口函数
java
// ReduceFunction:增量聚合(内存效率高)
stream.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((a, b) -> new Event(a.userId, a.amount + b.amount));
// AggregateFunction:增量聚合(更灵活)
stream.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new AggregateFunction<Event, Accumulator, Result>() {
@Override
public Accumulator createAccumulator() { return new Accumulator(); }
@Override
public Accumulator add(Event value, Accumulator acc) {
acc.sum += value.amount;
acc.count++;
return acc;
}
@Override
public Result getResult(Accumulator acc) {
return new Result(acc.sum / acc.count);
}
@Override
public Accumulator merge(Accumulator a, Accumulator b) {
a.sum += b.sum;
a.count += b.count;
return a;
}
});
// ProcessWindowFunction:全量处理(可访问窗口元数据)
stream.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events,
Collector<Result> out) {
long windowStart = ctx.window().getStart();
long windowEnd = ctx.window().getEnd();
double sum = 0;
for (Event e : events) sum += e.amount;
out.collect(new Result(key, sum, windowStart, windowEnd));
}
});状态管理
Flink 的状态是流处理的核心,支持跨事件的数据存储。
状态类型
java
public class StatefulFunction extends RichFlatMapFunction<Event, Result> {
// ValueState:单个值
private ValueState<Long> countState;
// ListState:列表
private ListState<Event> eventListState;
// MapState:Map
private MapState<String, Long> mapState;
// ReducingState:自动聚合
private ReducingState<Long> reducingState;
@Override
public void open(Configuration config) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class)
);
eventListState = getRuntimeContext().getListState(
new ListStateDescriptor<>("events", Event.class)
);
mapState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("map", String.class, Long.class)
);
}
@Override
public void flatMap(Event event, Collector<Result> out) throws Exception {
// 读取状态
Long count = countState.value();
if (count == null) count = 0L;
// 更新状态
countState.update(count + 1);
// 使用状态
if (count >= 100) {
out.collect(new Result(event.userId, count));
countState.clear(); // 清除状态
}
}
}State Backend
java
// 内存(MemoryStateBackend):适合开发测试
env.setStateBackend(new MemoryStateBackend());
// 文件系统(FsStateBackend):状态存 HDFS,适合中等规模
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
// RocksDB(RocksDBStateBackend):状态存本地 RocksDB,适合超大状态
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoints"));Checkpoint 与精确一次
java
// 开启 Checkpoint(每 60 秒)
env.enableCheckpointing(60000);
// 配置 Checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 作业取消后保留 Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);Checkpoint 原理(Chandy-Lamport 算法):
Source 注入 Barrier(屏障)
│
▼
Barrier 随数据流向下游传播
│
▼
每个算子收到所有上游 Barrier 后,对齐并保存状态快照
│
▼
所有算子完成快照,Checkpoint 成功双流 Join
java
// 基于时间窗口的 Join
stream1.join(stream2)
.where(e1 -> e1.userId)
.equalTo(e2 -> e2.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply((e1, e2) -> new JoinResult(e1, e2));
// Interval Join(时间区间 Join)
stream1.keyBy(e -> e.userId)
.intervalJoin(stream2.keyBy(e -> e.userId))
.between(Time.minutes(-5), Time.minutes(5))
.process(new ProcessJoinFunction<Event, Event, Result>() {
@Override
public void processElement(Event left, Event right,
Context ctx, Collector<Result> out) {
out.collect(new Result(left, right));
}
});提交 Flink 作业
bash
# 提交到 YARN
flink run \
-m yarn-cluster \
-yjm 2048 \
-ytm 4096 \
-ys 4 \
-yn 10 \
-c com.example.MyJob \
myapp.jar
# 从 Checkpoint 恢复
flink run \
-s hdfs://namenode:9000/flink/checkpoints/xxx/chk-100 \
-c com.example.MyJob \
myapp.jar
# 查看运行中的作业
flink list
# 取消作业(保留 Checkpoint)
flink cancel -s hdfs://checkpoints/ <job-id>