Skip to content

MapReduce 批处理原理

什么是 MapReduce

MapReduce 是 Google 提出的分布式计算编程模型,将复杂的并行计算抽象为两个简单操作:Map(映射)和 Reduce(归约)。Hadoop 实现了这一模型,使普通开发者能够在廉价集群上处理 PB 级数据。

核心思想:移动计算,而不是移动数据。


执行流程

输入数据(HDFS)


  InputFormat(切片)


  ┌─────────────────────────────────────────┐
  │              Map 阶段                    │
  │  Map Task 1  Map Task 2  Map Task 3     │
  │  (k1,v1)→   (k1,v1)→    (k1,v1)→      │
  │  [(k2,v2)]  [(k2,v2)]   [(k2,v2)]      │
  └──────────────────┬──────────────────────┘

              Shuffle(洗牌)
              ├── 分区(Partitioner)
              ├── 排序(Sort)
              ├── 合并(Combiner,可选)
              └── 传输到 Reduce

  ┌──────────────────▼──────────────────────┐
  │              Reduce 阶段                 │
  │  Reduce Task 1    Reduce Task 2         │
  │  (k2,[v2,v2...])→ (k2,[v2,v2...])→     │
  │  (k3,v3)          (k3,v3)              │
  └──────────────────┬──────────────────────┘


              输出(HDFS)

核心组件详解

Map 阶段

java
public class WordCountMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // key: 文件偏移量,value: 一行文本
        String line = value.toString();
        String[] words = line.split("\\s+");
        for (String w : words) {
            word.set(w);
            context.write(word, one);  // 输出 (word, 1)
        }
    }
}

Shuffle 阶段(最关键)

Shuffle 是 MapReduce 性能的核心,分为 Map 端和 Reduce 端:

Map 端 Shuffle

Map 输出


环形内存缓冲区(默认 100MB)
  │ 达到 80% 时溢写(Spill)

溢写到磁盘(按分区排序)


合并所有溢写文件(Merge)
  │ 可选:Combiner 本地聚合

最终 Map 输出文件(分区有序)

Reduce 端 Shuffle

从各 Map Task 拉取对应分区数据(HTTP)


内存缓冲区(合并)


归并排序(Merge Sort)


输入 Reduce 函数

Reduce 阶段

java
public class WordCountReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // key: 单词,values: [1, 1, 1, ...]
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);  // 输出 (word, count)
    }
}

Combiner(本地聚合)

Combiner 是 Map 端的本地 Reducer,减少 Shuffle 数据量:

java
// Combiner 通常与 Reducer 逻辑相同
job.setCombinerClass(WordCountReducer.class);

// 效果:
// 没有 Combiner:Map 输出 (hello,1)(hello,1)(hello,1) → 传输 3 条
// 有 Combiner:  Map 输出 (hello,3)                  → 传输 1 条

注意

Combiner 只能用于满足交换律和结合律的操作(如求和、求最大值),不能用于求平均值等操作。


完整 WordCount 示例

java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);  // 本地聚合
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置 Reduce Task 数量
        job.setNumReduceTasks(3);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

自定义分区

java
// 按首字母分区(A-M → 分区0,N-Z → 分区1)
public class AlphaPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        char firstChar = key.toString().toLowerCase().charAt(0);
        if (firstChar <= 'm') {
            return 0;
        } else {
            return 1 % numPartitions;
        }
    }
}

// 在 Job 中设置
job.setPartitionerClass(AlphaPartitioner.class);
job.setNumReduceTasks(2);

性能调优

关键配置

xml
<!-- mapred-site.xml -->

<!-- Map 内存 -->
<property>
  <name>mapreduce.map.memory.mb</name>
  <value>2048</value>
</property>

<!-- Reduce 内存 -->
<property>
  <name>mapreduce.reduce.memory.mb</name>
  <value>4096</value>
</property>

<!-- Shuffle 缓冲区大小(默认 100MB) -->
<property>
  <name>mapreduce.task.io.sort.mb</name>
  <value>256</value>
</property>

<!-- 溢写触发比例(默认 0.8) -->
<property>
  <name>mapreduce.map.sort.spill.percent</name>
  <value>0.8</value>
</property>

<!-- Reduce 端并行拉取线程数 -->
<property>
  <name>mapreduce.reduce.shuffle.parallelcopies</name>
  <value>10</value>
</property>

压缩配置

xml
<!-- 开启 Map 输出压缩(减少 Shuffle 数据量) -->
<property>
  <name>mapreduce.map.output.compress</name>
  <value>true</value>
</property>
<property>
  <name>mapreduce.map.output.compress.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

MapReduce vs Spark

维度MapReduceSpark
计算模型Map + ReduceDAG(有向无环图)
中间结果写磁盘内存(可选磁盘)
速度慢(磁盘 I/O)快 10-100x
迭代计算每次迭代读写磁盘内存中迭代
编程模型复杂(只有 Map/Reduce)丰富(100+ 算子)
容错重新执行 Task重新计算 RDD 分区
适用场景超大规模简单批处理批处理、流处理、ML

现状

MapReduce 在新项目中已基本被 Spark 取代,但理解 MapReduce 原理对理解 Spark Shuffle 机制非常有帮助。

本站内容由 褚成志 整理编写,仅供学习参考