Flink SQL — 流式 SQL
什么是 Flink SQL
Flink SQL 是 Flink 提供的统一 SQL 接口,可以用标准 SQL 处理流数据和批数据。底层基于 Apache Calcite 解析和优化,支持 ANSI SQL 标准以及流处理扩展语法。
核心价值:
- 降低流处理门槛,SQL 工程师直接上手
- 流批统一:同一 SQL 既能处理流也能处理批
- 与 Hive 生态集成
Table API & SQL 环境
python
from pyflink.table import EnvironmentSettings, TableEnvironment
# 流处理模式
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = TableEnvironment.create(settings)
# 批处理模式
settings = EnvironmentSettings.new_instance().in_batch_mode().build()
t_env = TableEnvironment.create(settings)java
// Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DDL:创建表
Kafka Source 表
sql
CREATE TABLE kafka_source (
user_id STRING,
event_type STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
-- 声明事件时间和 Watermark
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);MySQL CDC Source(变更数据捕获)
sql
CREATE TABLE mysql_orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_id STRING,
amount DECIMAL(10, 2),
status INT,
create_time TIMESTAMP
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'shop',
'table-name' = 'orders'
);Hive 表
sql
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf'
);
USE CATALOG hive_catalog;
USE dw_db;
-- 直接查询 Hive 表
SELECT * FROM dwd_user_log WHERE dt = '2024-01-01';JDBC Sink 表
sql
CREATE TABLE mysql_sink (
event_type STRING,
cnt BIGINT,
total DOUBLE,
PRIMARY KEY (event_type) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/result_db',
'table-name' = 'event_stats',
'username' = 'root',
'password' = 'password',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '1s'
);Filesystem Sink(写入 HDFS/S3)
sql
CREATE TABLE hdfs_sink (
user_id STRING,
event_type STRING,
amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://namenode:9000/output/events/',
'format' = 'parquet',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay' = '1 min'
);流式查询
基本聚合
sql
-- 实时统计各事件类型的数量和金额
INSERT INTO mysql_sink
SELECT
event_type,
COUNT(*) AS cnt,
SUM(amount) AS total
FROM kafka_source
GROUP BY event_type;时间窗口聚合
sql
-- 滚动窗口(每5分钟统计一次)
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
event_type,
COUNT(*) AS cnt,
SUM(amount) AS total
FROM kafka_source
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
event_type;
-- 滑动窗口(10分钟窗口,5分钟滑动)
SELECT
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS window_start,
event_type,
COUNT(*) AS cnt
FROM kafka_source
GROUP BY
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
event_type;
-- 会话窗口(30分钟无活动则关闭)
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
user_id,
COUNT(*) AS event_cnt
FROM kafka_source
GROUP BY
SESSION(event_time, INTERVAL '30' MINUTE),
user_id;累积窗口(Flink 1.13+)
sql
-- 每天从0点开始累积,每小时输出一次
SELECT
CUMULATE_START(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY) AS window_start,
CUMULATE_END(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY) AS window_end,
event_type,
COUNT(*) AS cnt
FROM kafka_source
GROUP BY
CUMULATE(event_time, INTERVAL '1' HOUR, INTERVAL '1' DAY),
event_type;流流 Join
sql
-- 基于时间区间的流流 Join
SELECT
o.order_id,
o.user_id,
o.amount,
p.payment_method
FROM orders o
JOIN payments p
ON o.order_id = p.order_id
AND p.payment_time BETWEEN o.order_time - INTERVAL '1' HOUR
AND o.order_time + INTERVAL '1' HOUR;流表 Join(维表 Join)
sql
-- 创建维表(MySQL 维度表)
CREATE TABLE dim_user (
user_id STRING,
user_name STRING,
city STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/dim_db',
'table-name' = 'dim_user',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '10 min'
);
-- 流表 Join(FOR SYSTEM_TIME AS OF 是关键语法)
SELECT
e.user_id,
e.event_type,
e.amount,
d.user_name,
d.city
FROM kafka_source AS e
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF e.proc_time AS d
ON e.user_id = d.user_id;去重与 Top-N
sql
-- 去重(保留每个 user_id 的最新一条记录)
SELECT user_id, event_type, amount, event_time
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY event_time DESC
) AS rn
FROM kafka_source
)
WHERE rn = 1;
-- Top-N(每个事件类型金额最高的 3 个用户)
SELECT event_type, user_id, amount
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY event_type
ORDER BY amount DESC
) AS rn
FROM kafka_source
)
WHERE rn <= 3;Pattern CEP(复杂事件处理)
sql
-- 检测用户在10分钟内连续3次登录失败
SELECT *
FROM kafka_source
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS first_fail_time,
LAST(A.event_time) AS last_fail_time,
COUNT(*) AS fail_count
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A{3,})
WITHIN INTERVAL '10' MINUTE
DEFINE
A AS event_type = 'login_fail'
);Flink SQL 客户端
bash
# 启动 SQL 客户端
./bin/sql-client.sh
# 执行 SQL 文件
./bin/sql-client.sh -f my_job.sql
# 设置配置
SET 'execution.checkpointing.interval' = '60s';
SET 'table.exec.sink.upsert-materialize' = 'none';
SET 'parallelism.default' = '4';
# 查看作业
SHOW JOBS;
# 停止作业
STOP JOB 'job-id' WITH SAVEPOINT;与 Hive 集成
sql
-- 读取 Hive 分区表
SELECT *
FROM hive_catalog.dw_db.dwd_user_log
WHERE dt = '2024-01-01';
-- 写入 Hive 表(流式写入,自动提交分区)
INSERT INTO hive_catalog.dw_db.dws_user_daily
SELECT
user_id,
COUNT(*) AS pv,
COUNT(DISTINCT session_id) AS sessions,
SUM(amount) AS total_amount,
DATE_FORMAT(event_time, 'yyyy-MM-dd') AS dt
FROM kafka_source
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' DAY),
DATE_FORMAT(event_time, 'yyyy-MM-dd');