Skip to content

Presto / Trino — 联邦查询引擎

什么是 Trino

Trino(原 PrestoSQL,由 Facebook 开源)是一个分布式 SQL 查询引擎,专为大规模数据的交互式分析设计。它最大的特点是联邦查询:用一条 SQL 同时查询 Hive、MySQL、Kafka、S3、Iceberg 等多种数据源。

核心优势

  • 联邦查询:跨数据源 JOIN
  • 无需数据移动:直接查询原始数据
  • 内存计算:全内存执行,速度快
  • 标准 SQL:ANSI SQL 兼容
  • 丰富连接器:50+ 数据源支持

适用场景

  • 跨数据源分析(Hive + MySQL + Kafka)
  • 数据湖查询(S3 + Iceberg/Delta)
  • 交互式 Ad-hoc 查询
  • 不适合:超长时间运行的批处理(用 Spark)

架构设计

┌─────────────────────────────────────────────────────────────┐
│                      Trino 集群                              │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                  Coordinator                          │  │
│  │  - 接收 SQL 查询                                     │  │
│  │  - 解析、优化、生成执行计划                           │  │
│  │  - 调度 Task 到 Worker                               │  │
│  │  - 汇总结果返回客户端                                 │  │
│  └──────────────────────────┬───────────────────────────┘  │
│                             │                               │
│  ┌──────────────────────────▼───────────────────────────┐  │
│  │                  Worker 集群                          │  │
│  │  Worker1  Worker2  Worker3  Worker4                  │  │
│  │  - 执行 Task(Pipeline 模型)                        │  │
│  │  - 通过 Connector 读取数据源                         │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
│  Connectors:                                               │
│  Hive  MySQL  PostgreSQL  Kafka  Iceberg  Delta  S3  ...   │
└─────────────────────────────────────────────────────────────┘

配置连接器

Hive 连接器

properties
# catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hive.allow-drop-table=true
hive.allow-rename-table=true

MySQL 连接器

properties
# catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://mysql:3306
connection-user=root
connection-password=password

Iceberg 连接器

properties
# catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=hive_metastore
hive.metastore.uri=thrift://hive-metastore:9083

Kafka 连接器

properties
# catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka:9092
kafka.table-names=mydb.user_events
kafka.hide-internal-columns=false

联邦查询示例

sql
-- 跨数据源 JOIN:Hive 事实表 + MySQL 维度表
SELECT
    h.user_id,
    m.user_name,
    m.city,
    COUNT(*) AS event_cnt,
    SUM(h.amount) AS total_amount
FROM hive.dw_db.dwd_user_events h
JOIN mysql.dim_db.dim_user m ON h.user_id = m.user_id
WHERE h.dt = '2024-01-01'
GROUP BY h.user_id, m.user_name, m.city
ORDER BY total_amount DESC
LIMIT 100;

-- 查询 Iceberg 数据湖
SELECT *
FROM iceberg.db.users
WHERE created_at >= DATE '2024-01-01';

-- 跨 Catalog 查询
SELECT
    a.user_id,
    a.total_orders,
    b.total_events
FROM (
    SELECT user_id, COUNT(*) AS total_orders
    FROM mysql.shop.orders
    GROUP BY user_id
) a
JOIN (
    SELECT user_id, COUNT(*) AS total_events
    FROM hive.dw_db.dwd_user_events
    WHERE dt >= '2024-01-01'
    GROUP BY user_id
) b ON a.user_id = b.user_id;

高级 SQL 特性

窗口函数

sql
SELECT
    user_id,
    event_time,
    amount,
    SUM(amount) OVER (
        PARTITION BY user_id
        ORDER BY event_time
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS cumulative_amount,
    RANK() OVER (PARTITION BY DATE(event_time) ORDER BY amount DESC) AS daily_rank,
    LAG(amount, 1, 0) OVER (PARTITION BY user_id ORDER BY event_time) AS prev_amount
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01';

Lambda 函数

sql
-- 数组操作
SELECT
    user_id,
    FILTER(tags, t -> t LIKE '%vip%') AS vip_tags,
    TRANSFORM(scores, s -> s * 1.1) AS boosted_scores,
    REDUCE(scores, 0, (acc, s) -> acc + s, acc -> acc) AS total_score
FROM user_profiles;

JSON 处理

sql
SELECT
    user_id,
    JSON_EXTRACT_SCALAR(properties, '$.city') AS city,
    JSON_EXTRACT_SCALAR(properties, '$.device') AS device,
    CAST(JSON_EXTRACT(properties, '$.tags') AS ARRAY(VARCHAR)) AS tags
FROM user_events
WHERE JSON_EXTRACT_SCALAR(properties, '$.city') = 'Beijing';

近似函数

sql
SELECT
    dt,
    APPROX_DISTINCT(user_id) AS approx_uv,  -- 近似去重(HyperLogLog)
    APPROX_PERCENTILE(amount, 0.5) AS median_amount,
    APPROX_PERCENTILE(amount, ARRAY[0.25, 0.5, 0.75, 0.95]) AS percentiles
FROM user_events
GROUP BY dt;

性能调优

查询优化配置

sql
-- 会话级别配置
SET SESSION join_distribution_type = 'AUTOMATIC';
SET SESSION task_concurrency = 8;
SET SESSION query_max_memory = '10GB';

-- 强制广播 Join(小表)
SELECT /*+ BROADCAST(small_table) */
    a.user_id, b.name
FROM large_table a
JOIN small_table b ON a.user_id = b.user_id;

分区裁剪

sql
-- 确保 WHERE 条件能触发分区裁剪
EXPLAIN
SELECT count(*)
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01';  -- 必须包含分区列

-- 查看执行计划
EXPLAIN ANALYZE
SELECT user_id, count(*)
FROM hive.dw_db.dwd_user_events
WHERE dt = '2024-01-01'
GROUP BY user_id;

统计信息

sql
-- 收集表统计信息(提升查询优化质量)
ANALYZE hive.dw_db.dwd_user_events;

-- 查看统计信息
SHOW STATS FOR hive.dw_db.dwd_user_events;

命令行使用

bash
# 连接 Trino
trino --server http://coordinator:8080 --catalog hive --schema dw_db

# 执行 SQL 文件
trino --server http://coordinator:8080 --file query.sql

# 查看 Catalog 列表
SHOW CATALOGS;

# 查看 Schema
SHOW SCHEMAS FROM hive;

# 查看表
SHOW TABLES FROM hive.dw_db;

# 查看表结构
DESCRIBE hive.dw_db.dwd_user_events;

# 查看正在运行的查询
SELECT * FROM system.runtime.queries WHERE state = 'RUNNING';

# 终止查询
CALL system.runtime.kill_query(query_id => 'xxx', message => 'cancelled');

Trino vs Presto vs Spark SQL

维度TrinoPrestoSpark SQL
起源PrestoSQL 分支FacebookApache
延迟秒级秒级秒~分钟
联邦查询✅ 最强有限
内存模型全内存全内存内存+磁盘
容错弱(查询失败重试)强(RDD 重算)
批处理不适合不适合
社区活跃

选型建议

  • 需要跨数据源联邦查询 → Trino
  • 需要长时间批处理 → Spark SQL
  • 需要实时 OLAP + MySQL 协议 → Doris

本站内容由 褚成志 整理编写,仅供学习参考