监控与性能调优
监控体系架构
┌─────────────────────────────────────────────────────────────┐
│ 监控体系 │
│ │
│ 数据采集层 │
│ ├── Prometheus(指标采集) │
│ ├── Hadoop JMX Exporter │
│ ├── Kafka Exporter │
│ ├── Node Exporter(系统指标) │
│ └── 自定义 Exporter │
│ │
│ 存储层 │
│ └── Prometheus TSDB / VictoriaMetrics │
│ │
│ 可视化层 │
│ └── Grafana(Dashboard) │
│ │
│ 告警层 │
│ ├── Alertmanager │
│ └── 钉钉 / 企业微信 / 邮件 │
└─────────────────────────────────────────────────────────────┘Prometheus + Grafana 部署
yaml
# docker-compose-monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.48.0
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
ports:
- "9090:9090"
grafana:
image: grafana/grafana:10.2.0
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana_data:/var/lib/grafana
ports:
- "3000:3000"
alertmanager:
image: prom/alertmanager:v0.26.0
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
ports:
- "9093:9093"
node-exporter:
image: prom/node-exporter:v1.7.0
network_mode: host
pid: host
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
volumes:
prometheus_data:
grafana_data:yaml
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- "alert_rules.yml"
scrape_configs:
# 系统指标
- job_name: 'node'
static_configs:
- targets: ['dn1:9100', 'dn2:9100', 'dn3:9100']
# Hadoop NameNode
- job_name: 'hadoop-namenode'
static_configs:
- targets: ['nn1:9870']
metrics_path: '/jmx'
params:
qry: ['Hadoop:service=NameNode,name=FSNamesystemState']
# Kafka
- job_name: 'kafka'
static_configs:
- targets: ['kafka1:9308', 'kafka2:9308', 'kafka3:9308']
# Spark(通过 Prometheus Sink)
- job_name: 'spark'
static_configs:
- targets: ['spark-master:4040']关键监控指标
HDFS 监控
promql
# HDFS 容量使用率
hadoop_namenode_capacity_used / hadoop_namenode_capacity_total * 100
# 副本不足的 Block 数量(应为 0)
hadoop_namenode_under_replicated_blocks
# 损坏的 Block 数量(应为 0)
hadoop_namenode_corrupt_blocks
# DataNode 存活数量
hadoop_namenode_num_live_data_nodes
# NameNode RPC 延迟
hadoop_namenode_rpc_processing_time_avg_timeKafka 监控
promql
# 消费者 Lag(消费延迟,应接近 0)
kafka_consumer_group_lag
# 消息生产速率
rate(kafka_server_brokertopicmetrics_messagesin_total[5m])
# 分区 Leader 不均衡数量(应为 0)
kafka_controller_kafkacontroller_offlinepartitionscount
# 副本同步延迟
kafka_server_replicamanager_underreplicatedpartitionsSpark 监控
promql
# 活跃 Executor 数量
spark_executor_count
# GC 时间占比(应 < 10%)
rate(spark_executor_jvmGCTime_total[5m]) / rate(spark_executor_cpuTime_total[5m])
# Shuffle 读写量
rate(spark_executor_shuffleReadBytes_total[5m])
rate(spark_executor_shuffleWriteBytes_total[5m])
# Task 失败率
rate(spark_executor_failedTasks_total[5m])告警规则
yaml
# alert_rules.yml
groups:
- name: bigdata_alerts
rules:
# HDFS 容量告警
- alert: HDFSCapacityHigh
expr: hadoop_namenode_capacity_used / hadoop_namenode_capacity_total > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "HDFS 容量使用率超过 85%"
description: "当前使用率: {{ $value | humanizePercentage }}"
# HDFS 有损坏 Block
- alert: HDFSCorruptBlocks
expr: hadoop_namenode_corrupt_blocks > 0
for: 1m
labels:
severity: critical
annotations:
summary: "HDFS 存在损坏 Block"
# Kafka 消费延迟过高
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_group_lag > 100000
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka 消费延迟过高: {{ $labels.group }}"
# 节点内存使用率过高
- alert: NodeMemoryHigh
expr: (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "节点 {{ $labels.instance }} 内存使用率超过 90%"
# 磁盘使用率过高
- alert: DiskUsageHigh
expr: (1 - node_filesystem_avail_bytes / node_filesystem_size_bytes) > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "节点 {{ $labels.instance }} 磁盘 {{ $labels.mountpoint }} 使用率超过 85%"Spark 性能调优
内存调优
bash
spark-submit \
--driver-memory 4g \
--driver-java-options "-XX:+UseG1GC -XX:MaxGCPauseMillis=200" \
--executor-memory 8g \
--executor-cores 4 \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:MaxGCPauseMillis=200" \
myapp.jarShuffle 调优
python
# 增大 Shuffle 分区数(避免单分区数据过大)
spark.conf.set("spark.sql.shuffle.partitions", "400")
# 开启 AQE 自动调整
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# 增大 Shuffle 缓冲区
spark.conf.set("spark.shuffle.file.buffer", "1m")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
# 使用 Kryo 序列化(比 Java 序列化快 10x)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")数据倾斜调优
python
# 方法 1:AQE 自动处理倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
# 方法 2:加盐打散
from pyspark.sql.functions import expr, rand
# 大表加随机前缀
large_df = large_df.withColumn(
"salted_key",
expr("concat(cast(floor(rand() * 10) as string), '_', key)")
)
# 小表扩展 10 倍
small_df_expanded = small_df.withColumn(
"salt",
expr("explode(sequence(0, 9))")
).withColumn(
"salted_key",
expr("concat(cast(salt as string), '_', key)")
)
# Join 后去掉盐值
result = large_df.join(small_df_expanded, "salted_key") \
.drop("salted_key", "salt")Flink 性能调优
Checkpoint 调优
java
// 增大 Checkpoint 间隔(减少 Checkpoint 开销)
env.enableCheckpointing(300000); // 5分钟
// 使用增量 Checkpoint(RocksDB)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = 增量
// 配置 Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://namenode:9000/flink/checkpoints"
);
// 并行 Checkpoint(提高吞吐)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);反压处理
反压(Backpressure):下游处理速度跟不上上游,导致上游积压
排查步骤:
1. Flink Web UI → 查看 Task 的 backpressure 指标
2. 找到最慢的算子(backpressure 最高的)
3. 分析原因:
- CPU 密集:增加并行度
- I/O 密集:异步 I/O
- 状态访问慢:调整 RocksDB 配置
- GC 频繁:增大内存,调整 GC 参数java
// 异步 I/O(避免同步等待外部系统)
DataStream<Result> result = AsyncDataStream.unorderedWait(
stream,
new AsyncDatabaseRequest(),
1000, // 超时时间(毫秒)
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);数据质量监控
python
# 使用 Great Expectations 做数据质量检查
import great_expectations as ge
# 创建数据集
df_ge = ge.from_pandas(df.toPandas())
# 定义期望
df_ge.expect_column_values_to_not_be_null("user_id")
df_ge.expect_column_values_to_be_between("age", 0, 150)
df_ge.expect_column_values_to_be_in_set("status", [0, 1, 2])
df_ge.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)
# 验证
result = df_ge.validate()
if not result["success"]:
raise ValueError(f"Data quality check failed: {result}")sql
-- Hive 数据质量检查 SQL
-- 检查空值率
SELECT
COUNT(*) AS total,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS null_user_id,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) / COUNT(*) AS null_rate
FROM dwd_user_events
WHERE dt = '2024-01-01';
-- 检查数据量波动(与昨天对比)
SELECT
a.dt,
a.cnt AS today_cnt,
b.cnt AS yesterday_cnt,
(a.cnt - b.cnt) / b.cnt AS change_rate
FROM (SELECT dt, COUNT(*) AS cnt FROM dwd_user_events WHERE dt = '2024-01-01' GROUP BY dt) a
JOIN (SELECT dt, COUNT(*) AS cnt FROM dwd_user_events WHERE dt = '2023-12-31' GROUP BY dt) b
ON 1=1;