ClickHouse — 列式 OLAP
什么是 ClickHouse
ClickHouse 是由 Yandex 开源的列式数据库管理系统,专为 OLAP(联机分析处理)场景设计。它以极致的查询性能著称,单表查询速度可达每秒数十亿行。
核心优势:
- 列式存储:只读取查询涉及的列,I/O 极少
- 向量化执行:SIMD 指令批量处理数据
- 数据压缩:列式存储压缩率高(通常 5-10x)
- 分布式查询:支持分片和副本
- 实时写入:支持高频批量写入
适用场景:日志分析、用户行为分析、监控指标、广告统计
核心架构
┌─────────────────────────────────────────────────────────────┐
│ ClickHouse 集群 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 分布式表(Distributed) │ │
│ │ 查询路由到各分片,结果合并返回 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ Shard 1 Shard 2 │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Replica 1 │ │ Replica 1 │ │
│ │ MergeTree 表 │ │ MergeTree 表 │ │
│ └──────────────────┘ └──────────────────┘ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Replica 2 │ │ Replica 2 │ │
│ │ MergeTree 表 │ │ MergeTree 表 │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘MergeTree 表引擎
MergeTree 是 ClickHouse 最重要的表引擎,是其他引擎的基础。
基本 MergeTree
sql
CREATE TABLE user_events (
user_id UInt64,
event_type LowCardinality(String), -- 低基数字符串优化
page_url String,
amount Float64,
event_time DateTime,
dt Date
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(dt) -- 按月分区
ORDER BY (user_id, event_time) -- 主键(排序键),影响查询性能
PRIMARY KEY (user_id) -- 稀疏索引(可以与 ORDER BY 不同)
SETTINGS index_granularity = 8192; -- 索引粒度(每8192行一个索引点)ReplacingMergeTree(去重)
sql
-- 相同 ORDER BY 的行,后台异步合并时保留最新版本
CREATE TABLE user_profiles (
user_id UInt64,
name String,
age UInt8,
updated_at DateTime
)
ENGINE = ReplacingMergeTree(updated_at) -- 按 updated_at 保留最新
ORDER BY user_id;
-- 查询时强制去重(合并可能未完成)
SELECT * FROM user_profiles FINAL WHERE user_id = 1;SummingMergeTree(预聚合)
sql
-- 相同 ORDER BY 的行,后台合并时自动求和
CREATE TABLE daily_stats (
dt Date,
user_id UInt64,
pv UInt64,
uv UInt64,
revenue Float64
)
ENGINE = SummingMergeTree((pv, uv, revenue)) -- 指定需要求和的列
ORDER BY (dt, user_id);AggregatingMergeTree(聚合状态)
sql
-- 存储聚合中间状态,支持增量聚合
CREATE TABLE user_daily_agg (
dt Date,
user_id UInt64,
pv AggregateFunction(count, UInt64),
avg_amount AggregateFunction(avg, Float64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (dt, user_id);
-- 写入(使用 -State 函数)
INSERT INTO user_daily_agg
SELECT dt, user_id, countState(), avgState(amount)
FROM user_events
GROUP BY dt, user_id;
-- 查询(使用 -Merge 函数)
SELECT dt, user_id, countMerge(pv), avgMerge(avg_amount)
FROM user_daily_agg
GROUP BY dt, user_id;CollapsingMergeTree(折叠去重)
sql
-- 通过 sign 列标记插入(+1)和删除(-1),合并时抵消
CREATE TABLE orders (
order_id UInt64,
user_id UInt64,
amount Float64,
sign Int8 -- +1 插入,-1 删除
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY order_id;
-- 插入
INSERT INTO orders VALUES (1, 100, 99.9, 1);
-- 更新(先删除旧值,再插入新值)
INSERT INTO orders VALUES (1, 100, 99.9, -1); -- 删除旧值
INSERT INTO orders VALUES (1, 100, 199.9, 1); -- 插入新值常用 SQL
sql
-- 查询优化:使用分区裁剪
SELECT count(), sum(amount)
FROM user_events
WHERE dt >= '2024-01-01' AND dt < '2024-02-01' -- 分区裁剪
AND user_id = 12345; -- 主键过滤
-- 近似计算(极快)
SELECT
uniq(user_id) AS uv, -- 近似去重计数(HyperLogLog)
uniqExact(user_id) AS exact_uv, -- 精确去重(慢)
quantile(0.5)(amount) AS median, -- 近似中位数
topK(10)(page_url) AS top_pages -- Top-K
FROM user_events
WHERE dt = today();
-- 时间序列分析
SELECT
toStartOfHour(event_time) AS hour,
count() AS cnt,
sum(amount) AS revenue
FROM user_events
WHERE dt = today()
GROUP BY hour
ORDER BY hour;
-- 漏斗分析
SELECT
countIf(event_type = 'view') AS view_cnt,
countIf(event_type = 'click') AS click_cnt,
countIf(event_type = 'purchase') AS purchase_cnt,
countIf(event_type = 'click') / countIf(event_type = 'view') AS click_rate,
countIf(event_type = 'purchase') / countIf(event_type = 'click') AS purchase_rate
FROM user_events
WHERE dt = today();
-- 留存分析(使用 retention 函数)
SELECT
dt,
retention(
event_type = 'login',
event_type = 'login' -- 次日留存
) AS retention
FROM user_events
GROUP BY dt;
-- 窗口函数
SELECT
user_id,
event_time,
amount,
sum(amount) OVER (PARTITION BY user_id ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cum_amount,
row_number() OVER (PARTITION BY user_id ORDER BY amount DESC) AS rn
FROM user_events;数据写入
sql
-- 批量插入(推荐,每次至少 1000 行)
INSERT INTO user_events VALUES
(1, 'click', '/home', 0, now(), today()),
(2, 'purchase', '/product/123', 99.9, now(), today());
-- 从查询结果插入
INSERT INTO user_events
SELECT user_id, event_type, page_url, amount, event_time, toDate(event_time)
FROM staging_table;
-- 从文件导入
clickhouse-client --query "INSERT INTO user_events FORMAT CSV" < data.csv
-- 从 Kafka 读取(Kafka 表引擎)
CREATE TABLE kafka_source (
user_id UInt64,
event_type String,
amount Float64,
event_time DateTime
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'user_events',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow';
-- 物化视图:自动将 Kafka 数据写入 MergeTree
CREATE MATERIALIZED VIEW kafka_mv TO user_events AS
SELECT * FROM kafka_source;物化视图
sql
-- 创建物化视图(实时预聚合)
CREATE MATERIALIZED VIEW hourly_stats
ENGINE = SummingMergeTree()
ORDER BY (hour, event_type)
AS SELECT
toStartOfHour(event_time) AS hour,
event_type,
count() AS cnt,
sum(amount) AS total_amount
FROM user_events
GROUP BY hour, event_type;
-- 查询物化视图(极快)
SELECT hour, event_type, sum(cnt), sum(total_amount)
FROM hourly_stats
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, event_type
ORDER BY hour;性能调优
sql
-- 查看查询执行计划
EXPLAIN SELECT count() FROM user_events WHERE dt = today();
-- 查看查询日志
SELECT query, query_duration_ms, read_rows, read_bytes
FROM system.query_log
WHERE type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;
-- 查看表大小
SELECT
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
count() AS parts
FROM system.parts
WHERE active AND database = 'default'
GROUP BY table
ORDER BY sum(bytes) DESC;
-- 强制合并(减少 Part 数量)
OPTIMIZE TABLE user_events FINAL;xml
<!-- config.xml 性能配置 -->
<max_memory_usage>10000000000</max_memory_usage> <!-- 10GB -->
<max_threads>16</max_threads>
<max_block_size>65536</max_block_size>
<max_insert_block_size>1048576</max_insert_block_size>