MapReduce 批处理原理
什么是 MapReduce
MapReduce 是 Google 提出的分布式计算编程模型,将复杂的并行计算抽象为两个简单操作:Map(映射)和 Reduce(归约)。Hadoop 实现了这一模型,使普通开发者能够在廉价集群上处理 PB 级数据。
核心思想:移动计算,而不是移动数据。
执行流程
输入数据(HDFS)
│
▼
InputFormat(切片)
│
▼
┌─────────────────────────────────────────┐
│ Map 阶段 │
│ Map Task 1 Map Task 2 Map Task 3 │
│ (k1,v1)→ (k1,v1)→ (k1,v1)→ │
│ [(k2,v2)] [(k2,v2)] [(k2,v2)] │
└──────────────────┬──────────────────────┘
│
Shuffle(洗牌)
├── 分区(Partitioner)
├── 排序(Sort)
├── 合并(Combiner,可选)
└── 传输到 Reduce
│
┌──────────────────▼──────────────────────┐
│ Reduce 阶段 │
│ Reduce Task 1 Reduce Task 2 │
│ (k2,[v2,v2...])→ (k2,[v2,v2...])→ │
│ (k3,v3) (k3,v3) │
└──────────────────┬──────────────────────┘
│
▼
输出(HDFS)核心组件详解
Map 阶段
java
public class WordCountMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// key: 文件偏移量,value: 一行文本
String line = value.toString();
String[] words = line.split("\\s+");
for (String w : words) {
word.set(w);
context.write(word, one); // 输出 (word, 1)
}
}
}Shuffle 阶段(最关键)
Shuffle 是 MapReduce 性能的核心,分为 Map 端和 Reduce 端:
Map 端 Shuffle:
Map 输出
│
▼
环形内存缓冲区(默认 100MB)
│ 达到 80% 时溢写(Spill)
▼
溢写到磁盘(按分区排序)
│
▼
合并所有溢写文件(Merge)
│ 可选:Combiner 本地聚合
▼
最终 Map 输出文件(分区有序)Reduce 端 Shuffle:
从各 Map Task 拉取对应分区数据(HTTP)
│
▼
内存缓冲区(合并)
│
▼
归并排序(Merge Sort)
│
▼
输入 Reduce 函数Reduce 阶段
java
public class WordCountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// key: 单词,values: [1, 1, 1, ...]
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); // 输出 (word, count)
}
}Combiner(本地聚合)
Combiner 是 Map 端的本地 Reducer,减少 Shuffle 数据量:
java
// Combiner 通常与 Reducer 逻辑相同
job.setCombinerClass(WordCountReducer.class);
// 效果:
// 没有 Combiner:Map 输出 (hello,1)(hello,1)(hello,1) → 传输 3 条
// 有 Combiner: Map 输出 (hello,3) → 传输 1 条注意
Combiner 只能用于满足交换律和结合律的操作(如求和、求最大值),不能用于求平均值等操作。
完整 WordCount 示例
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 本地聚合
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置 Reduce Task 数量
job.setNumReduceTasks(3);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}自定义分区
java
// 按首字母分区(A-M → 分区0,N-Z → 分区1)
public class AlphaPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
char firstChar = key.toString().toLowerCase().charAt(0);
if (firstChar <= 'm') {
return 0;
} else {
return 1 % numPartitions;
}
}
}
// 在 Job 中设置
job.setPartitionerClass(AlphaPartitioner.class);
job.setNumReduceTasks(2);性能调优
关键配置
xml
<!-- mapred-site.xml -->
<!-- Map 内存 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<!-- Reduce 内存 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<!-- Shuffle 缓冲区大小(默认 100MB) -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>256</value>
</property>
<!-- 溢写触发比例(默认 0.8) -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.8</value>
</property>
<!-- Reduce 端并行拉取线程数 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>10</value>
</property>压缩配置
xml
<!-- 开启 Map 输出压缩(减少 Shuffle 数据量) -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>MapReduce vs Spark
| 维度 | MapReduce | Spark |
|---|---|---|
| 计算模型 | Map + Reduce | DAG(有向无环图) |
| 中间结果 | 写磁盘 | 内存(可选磁盘) |
| 速度 | 慢(磁盘 I/O) | 快 10-100x |
| 迭代计算 | 每次迭代读写磁盘 | 内存中迭代 |
| 编程模型 | 复杂(只有 Map/Reduce) | 丰富(100+ 算子) |
| 容错 | 重新执行 Task | 重新计算 RDD 分区 |
| 适用场景 | 超大规模简单批处理 | 批处理、流处理、ML |
现状
MapReduce 在新项目中已基本被 Spark 取代,但理解 MapReduce 原理对理解 Spark Shuffle 机制非常有帮助。