Presto / Trino — 联邦查询引擎
什么是 Trino
Trino(原 PrestoSQL,由 Facebook 开源)是一个分布式 SQL 查询引擎,专为大规模数据的交互式分析设计。它最大的特点是联邦查询:用一条 SQL 同时查询 Hive、MySQL、Kafka、S3、Iceberg 等多种数据源。
核心优势:
- 联邦查询:跨数据源 JOIN
- 无需数据移动:直接查询原始数据
- 内存计算:全内存执行,速度快
- 标准 SQL:ANSI SQL 兼容
- 丰富连接器:50+ 数据源支持
适用场景:
- 跨数据源分析(Hive + MySQL + Kafka)
- 数据湖查询(S3 + Iceberg/Delta)
- 交互式 Ad-hoc 查询
- 不适合:超长时间运行的批处理(用 Spark)
架构设计
┌─────────────────────────────────────────────────────────────┐
│ Trino 集群 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Coordinator │ │
│ │ - 接收 SQL 查询 │ │
│ │ - 解析、优化、生成执行计划 │ │
│ │ - 调度 Task 到 Worker │ │
│ │ - 汇总结果返回客户端 │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ Worker 集群 │ │
│ │ Worker1 Worker2 Worker3 Worker4 │ │
│ │ - 执行 Task(Pipeline 模型) │ │
│ │ - 通过 Connector 读取数据源 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ Connectors: │
│ Hive MySQL PostgreSQL Kafka Iceberg Delta S3 ... │
└─────────────────────────────────────────────────────────────┘配置连接器
Hive 连接器
properties
# catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hive.allow-drop-table=true
hive.allow-rename-table=trueMySQL 连接器
properties
# catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://mysql:3306
connection-user=root
connection-password=passwordIceberg 连接器
properties
# catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083Kafka 连接器
properties
# catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka:9092
kafka.table-names=mydb.user_events
kafka.hide-internal-columns=false联邦查询示例
sql
-- 跨数据源 JOIN:Hive 事实表 + MySQL 维度表
SELECT
h.user_id,
m.user_name,
m.city,
COUNT(*) AS event_cnt,
SUM(h.amount) AS total_amount
FROM hive.dw_db.dwd_user_events h
JOIN mysql.dim_db.dim_user m ON h.user_id = m.user_id
WHERE h.dt = '2024-01-01'
GROUP BY h.user_id, m.user_name, m.city
ORDER BY total_amount DESC
LIMIT 100;
-- 查询 Iceberg 数据湖
SELECT *
FROM iceberg.db.users
WHERE created_at >= DATE '2024-01-01';
-- 跨 Catalog 查询
SELECT
a.user_id,
a.total_orders,
b.total_events
FROM (
SELECT user_id, COUNT(*) AS total_orders
FROM mysql.shop.orders
GROUP BY user_id
) a
JOIN (
SELECT user_id, COUNT(*) AS total_events
FROM hive.dw_db.dwd_user_events
WHERE dt >= '2024-01-01'
GROUP BY user_id
) b ON a.user_id = b.user_id;高级 SQL 特性
窗口函数
sql
SELECT
user_id,
event_time,
amount,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_amount,
RANK() OVER (PARTITION BY DATE(event_time) ORDER BY amount DESC) AS daily_rank,
LAG(amount, 1, 0) OVER (PARTITION BY user_id ORDER BY event_time) AS prev_amount
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01';Lambda 函数
sql
-- 数组操作
SELECT
user_id,
FILTER(tags, t -> t LIKE '%vip%') AS vip_tags,
TRANSFORM(scores, s -> s * 1.1) AS boosted_scores,
REDUCE(scores, 0, (acc, s) -> acc + s, acc -> acc) AS total_score
FROM user_profiles;JSON 处理
sql
SELECT
user_id,
JSON_EXTRACT_SCALAR(properties, '$.city') AS city,
JSON_EXTRACT_SCALAR(properties, '$.device') AS device,
CAST(JSON_EXTRACT(properties, '$.tags') AS ARRAY(VARCHAR)) AS tags
FROM user_events
WHERE JSON_EXTRACT_SCALAR(properties, '$.city') = 'Beijing';近似函数
sql
SELECT
dt,
APPROX_DISTINCT(user_id) AS approx_uv, -- 近似去重(HyperLogLog)
APPROX_PERCENTILE(amount, 0.5) AS median_amount,
APPROX_PERCENTILE(amount, ARRAY[0.25, 0.5, 0.75, 0.95]) AS percentiles
FROM user_events
GROUP BY dt;性能调优
查询优化配置
sql
-- 会话级别配置
SET SESSION join_distribution_type = 'AUTOMATIC';
SET SESSION task_concurrency = 8;
SET SESSION query_max_memory = '10GB';
-- 强制广播 Join(小表)
SELECT /*+ BROADCAST(small_table) */
a.user_id, b.name
FROM large_table a
JOIN small_table b ON a.user_id = b.user_id;分区裁剪
sql
-- 确保 WHERE 条件能触发分区裁剪
EXPLAIN
SELECT count(*)
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01'; -- 必须包含分区列
-- 查看执行计划
EXPLAIN ANALYZE
SELECT user_id, count(*)
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01'
GROUP BY user_id;统计信息
sql
-- 收集表统计信息(提升查询优化质量)
ANALYZE hive.dw_db.dwd_user_events;
-- 查看统计信息
SHOW STATS FOR hive.dw_db.dwd_user_events;命令行使用
bash
# 连接 Trino
trino --server http://coordinator:8080 --catalog hive --schema dw_db
# 执行 SQL 文件
trino --server http://coordinator:8080 --file query.sql
# 查看 Catalog 列表
SHOW CATALOGS;
# 查看 Schema
SHOW SCHEMAS FROM hive;
# 查看表
SHOW TABLES FROM hive.dw_db;
# 查看表结构
DESCRIBE hive.dw_db.dwd_user_events;
# 查看正在运行的查询
SELECT * FROM system.runtime.queries WHERE state = 'RUNNING';
# 终止查询
CALL system.runtime.kill_query(query_id => 'xxx', message => 'cancelled');Trino vs Presto vs Spark SQL
| 维度 | Trino | Presto | Spark SQL |
|---|---|---|---|
| 起源 | PrestoSQL 分支 | Apache | |
| 延迟 | 秒级 | 秒级 | 秒~分钟 |
| 联邦查询 | ✅ 最强 | ✅ | 有限 |
| 内存模型 | 全内存 | 全内存 | 内存+磁盘 |
| 容错 | 弱(查询失败重试) | 弱 | 强(RDD 重算) |
| 批处理 | 不适合 | 不适合 | ✅ |
| 社区活跃 | 高 | 中 | 高 |
选型建议
- 需要跨数据源联邦查询 → Trino
- 需要长时间批处理 → Spark SQL
- 需要实时 OLAP + MySQL 协议 → Doris