Skip to content

监控与性能调优

监控体系架构

┌─────────────────────────────────────────────────────────────┐
│                      监控体系                                │
│                                                             │
│  数据采集层                                                  │
│  ├── Prometheus(指标采集)                                  │
│  ├── Hadoop JMX Exporter                                   │
│  ├── Kafka Exporter                                        │
│  ├── Node Exporter(系统指标)                               │
│  └── 自定义 Exporter                                        │
│                                                             │
│  存储层                                                      │
│  └── Prometheus TSDB / VictoriaMetrics                     │
│                                                             │
│  可视化层                                                    │
│  └── Grafana(Dashboard)                                   │
│                                                             │
│  告警层                                                      │
│  ├── Alertmanager                                          │
│  └── 钉钉 / 企业微信 / 邮件                                  │
└─────────────────────────────────────────────────────────────┘

Prometheus + Grafana 部署

yaml
# docker-compose-monitoring.yml
version: '3.8'

services:
  prometheus:
    image: prom/prometheus:v2.48.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:10.2.0
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
    volumes:
      - grafana_data:/var/lib/grafana
    ports:
      - "3000:3000"

  alertmanager:
    image: prom/alertmanager:v0.26.0
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
    ports:
      - "9093:9093"

  node-exporter:
    image: prom/node-exporter:v1.7.0
    network_mode: host
    pid: host
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro

volumes:
  prometheus_data:
  grafana_data:
yaml
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

rule_files:
  - "alert_rules.yml"

scrape_configs:
  # 系统指标
  - job_name: 'node'
    static_configs:
      - targets: ['dn1:9100', 'dn2:9100', 'dn3:9100']

  # Hadoop NameNode
  - job_name: 'hadoop-namenode'
    static_configs:
      - targets: ['nn1:9870']
    metrics_path: '/jmx'
    params:
      qry: ['Hadoop:service=NameNode,name=FSNamesystemState']

  # Kafka
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka1:9308', 'kafka2:9308', 'kafka3:9308']

  # Spark(通过 Prometheus Sink)
  - job_name: 'spark'
    static_configs:
      - targets: ['spark-master:4040']

关键监控指标

HDFS 监控

promql
# HDFS 容量使用率
hadoop_namenode_capacity_used / hadoop_namenode_capacity_total * 100

# 副本不足的 Block 数量(应为 0)
hadoop_namenode_under_replicated_blocks

# 损坏的 Block 数量(应为 0)
hadoop_namenode_corrupt_blocks

# DataNode 存活数量
hadoop_namenode_num_live_data_nodes

# NameNode RPC 延迟
hadoop_namenode_rpc_processing_time_avg_time

Kafka 监控

promql
# 消费者 Lag(消费延迟,应接近 0)
kafka_consumer_group_lag

# 消息生产速率
rate(kafka_server_brokertopicmetrics_messagesin_total[5m])

# 分区 Leader 不均衡数量(应为 0)
kafka_controller_kafkacontroller_offlinepartitionscount

# 副本同步延迟
kafka_server_replicamanager_underreplicatedpartitions

Spark 监控

promql
# 活跃 Executor 数量
spark_executor_count

# GC 时间占比(应 < 10%)
rate(spark_executor_jvmGCTime_total[5m]) / rate(spark_executor_cpuTime_total[5m])

# Shuffle 读写量
rate(spark_executor_shuffleReadBytes_total[5m])
rate(spark_executor_shuffleWriteBytes_total[5m])

# Task 失败率
rate(spark_executor_failedTasks_total[5m])

告警规则

yaml
# alert_rules.yml
groups:
  - name: bigdata_alerts
    rules:
      # HDFS 容量告警
      - alert: HDFSCapacityHigh
        expr: hadoop_namenode_capacity_used / hadoop_namenode_capacity_total > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "HDFS 容量使用率超过 85%"
          description: "当前使用率: {{ $value | humanizePercentage }}"

      # HDFS 有损坏 Block
      - alert: HDFSCorruptBlocks
        expr: hadoop_namenode_corrupt_blocks > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "HDFS 存在损坏 Block"

      # Kafka 消费延迟过高
      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_group_lag > 100000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消费延迟过高: {{ $labels.group }}"

      # 节点内存使用率过高
      - alert: NodeMemoryHigh
        expr: (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) > 0.9
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "节点 {{ $labels.instance }} 内存使用率超过 90%"

      # 磁盘使用率过高
      - alert: DiskUsageHigh
        expr: (1 - node_filesystem_avail_bytes / node_filesystem_size_bytes) > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "节点 {{ $labels.instance }} 磁盘 {{ $labels.mountpoint }} 使用率超过 85%"

Spark 性能调优

内存调优

bash
spark-submit \
  --driver-memory 4g \
  --driver-java-options "-XX:+UseG1GC -XX:MaxGCPauseMillis=200" \
  --executor-memory 8g \
  --executor-cores 4 \
  --conf spark.memory.fraction=0.8 \
  --conf spark.memory.storageFraction=0.3 \
  --conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:MaxGCPauseMillis=200" \
  myapp.jar

Shuffle 调优

python
# 增大 Shuffle 分区数(避免单分区数据过大)
spark.conf.set("spark.sql.shuffle.partitions", "400")

# 开启 AQE 自动调整
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# 增大 Shuffle 缓冲区
spark.conf.set("spark.shuffle.file.buffer", "1m")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")

# 使用 Kryo 序列化(比 Java 序列化快 10x)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

数据倾斜调优

python
# 方法 1:AQE 自动处理倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

# 方法 2:加盐打散
from pyspark.sql.functions import expr, rand

# 大表加随机前缀
large_df = large_df.withColumn(
    "salted_key",
    expr("concat(cast(floor(rand() * 10) as string), '_', key)")
)

# 小表扩展 10 倍
small_df_expanded = small_df.withColumn(
    "salt",
    expr("explode(sequence(0, 9))")
).withColumn(
    "salted_key",
    expr("concat(cast(salt as string), '_', key)")
)

# Join 后去掉盐值
result = large_df.join(small_df_expanded, "salted_key") \
    .drop("salted_key", "salt")

Checkpoint 调优

java
// 增大 Checkpoint 间隔(减少 Checkpoint 开销)
env.enableCheckpointing(300000);  // 5分钟

// 使用增量 Checkpoint(RocksDB)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));  // true = 增量

// 配置 Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage(
    "hdfs://namenode:9000/flink/checkpoints"
);

// 并行 Checkpoint(提高吞吐)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);

反压处理

反压(Backpressure):下游处理速度跟不上上游,导致上游积压

排查步骤:
1. Flink Web UI → 查看 Task 的 backpressure 指标
2. 找到最慢的算子(backpressure 最高的)
3. 分析原因:
   - CPU 密集:增加并行度
   - I/O 密集:异步 I/O
   - 状态访问慢:调整 RocksDB 配置
   - GC 频繁:增大内存,调整 GC 参数
java
// 异步 I/O(避免同步等待外部系统)
DataStream<Result> result = AsyncDataStream.unorderedWait(
    stream,
    new AsyncDatabaseRequest(),
    1000,  // 超时时间(毫秒)
    TimeUnit.MILLISECONDS,
    100    // 最大并发请求数
);

数据质量监控

python
# 使用 Great Expectations 做数据质量检查
import great_expectations as ge

# 创建数据集
df_ge = ge.from_pandas(df.toPandas())

# 定义期望
df_ge.expect_column_values_to_not_be_null("user_id")
df_ge.expect_column_values_to_be_between("age", 0, 150)
df_ge.expect_column_values_to_be_in_set("status", [0, 1, 2])
df_ge.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)

# 验证
result = df_ge.validate()
if not result["success"]:
    raise ValueError(f"Data quality check failed: {result}")
sql
-- Hive 数据质量检查 SQL
-- 检查空值率
SELECT
    COUNT(*) AS total,
    SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS null_user_id,
    SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) / COUNT(*) AS null_rate
FROM dwd_user_events
WHERE dt = '2024-01-01';

-- 检查数据量波动(与昨天对比)
SELECT
    a.dt,
    a.cnt AS today_cnt,
    b.cnt AS yesterday_cnt,
    (a.cnt - b.cnt) / b.cnt AS change_rate
FROM (SELECT dt, COUNT(*) AS cnt FROM dwd_user_events WHERE dt = '2024-01-01' GROUP BY dt) a
JOIN (SELECT dt, COUNT(*) AS cnt FROM dwd_user_events WHERE dt = '2023-12-31' GROUP BY dt) b
ON 1=1;

本站内容由 褚成志 整理编写,仅供学习参考