Apache Doris — 实时 OLAP
什么是 Apache Doris
Apache Doris(原 Palo,百度开源)是一个现代化的 MPP 分析型数据库,支持实时数据导入和亚秒级查询。它高度兼容 MySQL 协议,使用标准 SQL,是国内最流行的实时 OLAP 引擎之一。
核心优势:
- 实时导入:秒级数据可见
- MySQL 协议:直接用 MySQL 客户端连接
- 高并发查询:支持数千 QPS
- 向量化执行引擎:极致查询性能
- 流批一体:同时支持实时和批量导入
架构设计
┌─────────────────────────────────────────────────────────────┐
│ Doris 集群 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Frontend(FE) │ │
│ │ - SQL 解析和查询规划 │ │
│ │ - 元数据管理(基于 BDB JE) │ │
│ │ - 查询协调 │ │
│ │ - Leader FE + Follower FE(高可用) │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ Backend(BE) │ │
│ │ - 数据存储(列式,Segment 格式) │ │
│ │ - 查询执行(向量化) │ │
│ │ - 数据导入 │ │
│ │ BE1 BE2 BE3 BE4 BE5 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘数据模型
Doris 有三种数据模型:
Duplicate Key(明细模型)
sql
-- 保留所有数据,不去重,适合日志、事件流水
CREATE TABLE user_events (
event_time DATETIME,
user_id BIGINT,
event_type VARCHAR(50),
page_url VARCHAR(200),
amount DECIMAL(10, 2)
)
DUPLICATE KEY(event_time, user_id) -- 排序键(非唯一)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"storage_format" = "V2"
);Aggregate Key(聚合模型)
sql
-- 相同 Key 的数据自动聚合,适合预聚合报表
CREATE TABLE daily_stats (
dt DATE,
user_id BIGINT,
pv BIGINT SUM, -- 求和
uv BIGINT REPLACE, -- 替换(保留最新)
max_amount DECIMAL(10,2) MAX, -- 最大值
min_amount DECIMAL(10,2) MIN -- 最小值
)
AGGREGATE KEY(dt, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;Unique Key(主键模型)
sql
-- 相同 Key 的数据保留最新版本,支持 UPDATE/DELETE
CREATE TABLE user_profiles (
user_id BIGINT,
name VARCHAR(100),
age INT,
city VARCHAR(50),
updated_at DATETIME
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
"enable_unique_key_merge_on_write" = "true" -- 写时合并(查询更快)
);数据导入
Stream Load(HTTP 实时导入)
bash
# 从本地文件导入
curl -u root:password \
-H "label:load_20240101" \
-H "column_separator:," \
-H "columns:user_id,name,age,city" \
-T /data/users.csv \
http://fe_host:8030/api/mydb/user_profiles/_stream_load
# 从 JSON 导入
curl -u root:password \
-H "label:load_json_20240101" \
-H "format:json" \
-H "strip_outer_array:true" \
-T /data/users.json \
http://fe_host:8030/api/mydb/user_profiles/_stream_loadBroker Load(HDFS/S3 批量导入)
sql
LOAD LABEL mydb.load_20240101 (
DATA INFILE("hdfs://namenode:9000/data/users/*.csv")
INTO TABLE user_profiles
COLUMNS TERMINATED BY ","
(user_id, name, age, city)
)
WITH BROKER "hdfs_broker"
(
"username" = "hadoop",
"password" = ""
)
PROPERTIES (
"timeout" = "3600",
"max_filter_ratio" = "0.1" -- 允许 10% 的错误行
);
-- 查看导入状态
SHOW LOAD WHERE LABEL = "load_20240101";Routine Load(Kafka 实时消费)
sql
CREATE ROUTINE LOAD mydb.kafka_load ON user_events
COLUMNS TERMINATED BY ","
COLUMNS(user_id, event_type, amount, event_time)
PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000"
)
FROM KAFKA (
"kafka_broker_list" = "kafka:9092",
"kafka_topic" = "user_events",
"property.group.id" = "doris-routine-load",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
-- 查看 Routine Load 状态
SHOW ROUTINE LOAD FOR mydb.kafka_load;
-- 暂停/恢复
PAUSE ROUTINE LOAD FOR mydb.kafka_load;
RESUME ROUTINE LOAD FOR mydb.kafka_load;Insert Into(SQL 导入)
sql
-- 从其他表导入
INSERT INTO user_profiles
SELECT user_id, name, age, city, now()
FROM staging_table
WHERE age > 0;
-- 从外部表导入(Hive)
INSERT INTO user_profiles
SELECT * FROM hive_catalog.dw_db.dim_user;查询优化
分区与分桶
sql
-- 范围分区(按日期)
CREATE TABLE user_events (
event_time DATETIME,
user_id BIGINT,
amount DECIMAL(10, 2)
)
DUPLICATE KEY(event_time, user_id)
PARTITION BY RANGE(event_time) (
PARTITION p202401 VALUES LESS THAN ("2024-02-01"),
PARTITION p202402 VALUES LESS THAN ("2024-03-01"),
PARTITION p202403 VALUES LESS THAN ("2024-04-01")
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;
-- 动态分区(自动创建和删除分区)
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30", -- 保留最近30天
"dynamic_partition.end" = "3", -- 提前创建3天
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);物化视图
sql
-- 创建物化视图(同步更新)
CREATE MATERIALIZED VIEW hourly_stats AS
SELECT
date_trunc('hour', event_time) AS hour,
event_type,
count(*) AS cnt,
sum(amount) AS total
FROM user_events
GROUP BY date_trunc('hour', event_time), event_type;
-- 查询时自动命中物化视图
SELECT event_type, sum(cnt)
FROM hourly_stats -- 或直接查 user_events,优化器自动选择
WHERE hour >= '2024-01-01'
GROUP BY event_type;Colocate Join(本地 Join)
sql
-- 相同分桶策略的表,Join 时无需 Shuffle
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2)
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group"); -- 同一 Colocate Group
CREATE TABLE user_profiles (
user_id BIGINT,
name VARCHAR(100)
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group"); -- 同一 Colocate Group
-- 这个 Join 不需要 Shuffle(本地 Join)
SELECT o.order_id, u.name, o.amount
FROM orders o JOIN user_profiles u ON o.user_id = u.user_id;外部数据源
sql
-- 创建 Hive Catalog(直接查询 Hive 数据)
CREATE CATALOG hive_catalog PROPERTIES (
'type' = 'hms',
'hive.metastore.uris' = 'thrift://hive-metastore:9083'
);
-- 查询 Hive 表
SELECT * FROM hive_catalog.dw_db.dwd_user_log
WHERE dt = '2024-01-01';
-- 创建 Iceberg Catalog
CREATE CATALOG iceberg_catalog PROPERTIES (
'type' = 'iceberg',
'iceberg.catalog.type' = 'hms',
'hive.metastore.uris' = 'thrift://hive-metastore:9083'
);
-- 创建 MySQL 外部表
CREATE CATALOG mysql_catalog PROPERTIES (
'type' = 'jdbc',
'user' = 'root',
'password' = 'password',
'jdbc_url' = 'jdbc:mysql://mysql:3306/mydb',
'driver_url' = 'mysql-connector-java-8.0.25.jar',
'driver_class' = 'com.mysql.cj.jdbc.Driver'
);常用运维命令
sql
-- 查看集群状态
SHOW FRONTENDS;
SHOW BACKENDS;
-- 查看表状态
SHOW TABLET STATUS FROM user_events;
-- 查看导入任务
SHOW LOAD ORDER BY CreateTime DESC LIMIT 10;
-- 查看查询 Profile
SET enable_profile = true;
SELECT count() FROM user_events;
SHOW QUERY PROFILE "/";
-- 表数据均衡
ADMIN REBALANCE DISK;
-- 查看表大小
SELECT
table_name,
ROUND(SUM(data_length) / 1024 / 1024, 2) AS data_mb
FROM information_schema.tables
WHERE table_schema = 'mydb'
GROUP BY table_name
ORDER BY data_mb DESC;