当前位置: 首页 > ds >正文

Hadoop MapReduce过程

MapReduce 执行流程深度解析

第一部分:客户端与集群的分工 (getSplits vs. createRecordReader)

客户端(Client)
  1. job.submit() / job.waitForCompletion(true) 触发分片计算:客户端首先实例化作业指定的 InputFormat(如 TextInputFormat),然后调用其 getSplits() 方法。
  2. 提交资源:计算出的分片元数据(InputSplit 列表)、作业配置和 JAR 包等资源被提交给 YARN 的 ResourceManager
  • 源码佐证:
    • FileInputFormat.java 中核心的 getSplits(JobContext job) 方法,通过遍历文件、计算 splitSize 来生成 FileSplit 对象列表,这正是分片逻辑的实现。
      // ... existing code ...public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);// ... 循环遍历文件并创建分片 ...return splits;}
      // ... existing code ...
      
    • 在大量的测试用例中(如 TestFixedLengthInputFormat.javaTestCombineTextInputFormat.java),都是在测试代码中直接调用 format.getSplits(job, ...),这模拟了客户端的行为。
集群(YARN)
  1. ResourceManager 接收到作业后,启动 ApplicationMaster (AM)
  2. ApplicationMaster 根据 InputSplit 的数量向 ResourceManager 申请相应数量的容器(Container)来执行 Map 任务。
  3. NodeManager 在容器中启动 MapTask 进程(YarnChild)。
  4. MapTask 进程内部:
    • 反序列化获取分配给自己的那个 InputSplit
    • 加载 InputFormat 类,并调用 createRecordReader(split, context) 方法创建 RecordReader
    • 使用 RecordReader 从分片中读取 <key, value> 对,并传递给用户实现的 map() 方法。
  • 源码佐证:
    • InputSampler.java 是一个在客户端对输入进行采样的工具,它完美地模拟了 Map 任务的行为:它先获取分片,然后为每个分片创建 RecordReader 并读取数据。
      // ... existing code ...RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), samplingContext);reader.initialize(splits.get(i), samplingContext);while (reader.nextKeyValue()) {samples.add(ReflectionUtils.copy(job.getConfiguration(),reader.getCurrentKey(), null));
      // ... existing code ...
      
    • MapReduceTutorial.md 也明确指出:RecordReader 的职责是从 InputSplit 提供的面向字节的视图中,转换为 Mapper 需要的面向记录的视图。

第二部分:MapReduce 完整流程

1. 输入(Input)和分片(Split)

  • 描述: 由 InputFormat 的 getSplits() 方法在客户端执行。它将输入数据源逻辑切分为多个 InputSplit,每个 InputSplit 对应一个 Map 任务。分片是逻辑概念,包含位置和长度,而非物理切割。
  • 源码佐证InputFormat.java 接口的 Javadoc 清晰地描述了 getSplits 的作用是“Logically split the set of input files for the job”。

2. 映射(Map)

  • 描述: 每个 Map 任务在一个 InputSplit 上执行。它使用 RecordReader 读取数据,并执行用户定义的 map() 方法,产出中间 <key, value> 对。

3. Shuffle(洗牌)

这是 MapReduce 的“心脏”。

  • Map 端 Shuffle:

    • 分区(Partition)Partitioner 决定中间键值对被发送到哪个 Reducer。
    • 排序和溢写(Sort & Spill): 数据首先写入环形内存缓冲区,在缓冲区内排序。当缓冲区满(由 mapreduce.map.sort.spill.percent 控制)时,数据被溢写(Spill)到磁盘上的临时文件。
    • 合并(Merge): 如果产生了多个溢写文件,Map 任务结束前,会将这些文件合并排序成一个单一的、已分区且内部有序的输出文件。这是为 Reduce 端的拉取做准备。
    • Combiner (可选): 在排序之后,合并之前对数据进行本地聚合,减少 I/O。
  • 源码佐证 (Map端Merge)MapTask.java 中有非常明确的合并逻辑。它会收集所有溢写文件(Spill)对应的 Segment,然后调用 Merger.merge 方法。

    // ... existing code ...//create the segments to be mergedList<Segment<K,V>> segmentList =new ArrayList<Segment<K, V>>(numSpills);for(int i = 0; i < numSpills; i++) {IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
    // ... existing code ...//merge@SuppressWarnings("unchecked")RawKeyValueIterator kvIter = Merger.merge(job, rfs,keyClass, valClass, codec,segmentList, mergeFactor,new Path(mapId.toString()),job.getOutputKeyComparator(), reporter, sortSegments,null, spilledRecordsCounter, sortPhase.phase(),TaskType.MAP);
    // ... existing code ...
    
  • Reduce 端 Shuffle:

    • 复制(Copy): Reduce 任务启动后,主动从各个 Map 任务的输出位置拉取(Copy)属于自己分区的数据。
    • 合并(Merge/Sort): 在拉取数据的同时,在内存中进行合并和排序。如果数据量过大,也会溢写到磁盘。最终,所有相关的 Map 输出被合并成一个统一的、有序的数据集。

4. 规约(Reduce)

  • 描述: 框架将 Shuffle 阶段排序好的数据,以 <key, (list of values)> 的形式喂给 reduce() 方法。对于每一个唯一的 key,reduce() 方法被调用一次。

5. 输出(Output)

  • 描述Reducer 的输出通过 OutputFormat(及其包含的 RecordWriter)写入最终目的地,如 HDFS。

Mapper 和 Reducer 的核心联系:中间键值对

Mapper 和 Reducer 的核心联系是中间键值对 (intermediate key/value pairs)

  1. Mapper 的产出: Mapper 的主要职责是处理输入的记录,并生成一系列中间的 <key, value> 对。这些是 Shuffle 过程的原材料。
  2. Reducer 的输入: Reducer 的输入是经过 Shuffle 和 Sort 之后,将来自所有 Mapper 的、具有相同 key 的中间值聚合在一起形成的 <key, (list of values)>

正如 Mapper.java 的注释所说:

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a {@link Reducer} to determine the final output.

// ... existing code ...* <p>All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {@link Reducer} to  * determine the final output. Users can control the sorting and grouping by * specifying two key {@link RawComparator} classes.</p>
// ... existing code ...

Mapper 把数据发往哪里?—— Partitioner 的决策

Mapper 并不直接“知道”要把数据发给哪个具体的 Reducer。这个决策是由 Partitioner 做出的。

  1. 分区(Partitioning): 对于 Mapper 输出的每一个 <key, value> 对,框架都会调用 Partitioner 的 getPartition 方法。
  2. 决定 Reducer 索引getPartition 方法会根据 key(或者 key 的一部分)计算出一个整数,这个整数就是目标 Reducer 的索引(从 0 到 numReduceTasks - 1)。
  3. 写入本地分区文件: Mapper 会将这个键值对写入自己本地磁盘上对应分区的一个缓冲区,最终合并成一个大的、已分区的输出文件。

所以,Mapper 的输出首先是写到自己所在的 Worker Node 的本地磁盘上,而不是直接通过网络发送给 Reducer。它只是根据 Partitioner 的逻辑,将数据整理好,等待 Reducer 来拉取。

MapReduceTutorial.md 对此有清晰的描述:

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.

MapReduceTutorial.md

[Partitioner](../../api/org/apache/hadoop/mapreduce/Partitioner.html) partitions the key space.Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a *hash function*. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the `m` reduce tasks the intermediate key (and hence the record) is sent to for reduction.

网络在何时以及如何表示?—— Reduce 端的拉取 (Copy/Fetch)

网络传输在 MapReduce 中主要用于 Reduce 端的 Shuffle 阶段

当一个 Reduce 任务启动后,它知道自己需要处理哪个分区的数据(例如,分区 i)。于是,它会向 ApplicationMaster 查询所有已完成的 Map 任务的地址,然后通过 HTTP 协议,从每一个 Map 任务所在的 NodeManager 上拉取(fetch/copy)属于分区 i 的那部分数据。

这个过程可以总结为:

  1. Reduce 任务启动: Reducer 知道自己的分区号。
  2. 查询 Map 输出位置: Reducer 向 ApplicationMaster 询问所有 Map 任务的输出位置。
  3. HTTP 拉取: Reducer 启动多个“复制线程”(copier threads),并行地通过 HTTP GET 请求从各个 Map 任务所在的节点上下载属于自己分区的数据块。
  4. 内存/磁盘合并: 拉取来的数据块首先在 Reducer 的内存中进行合并和排序,如果数据量太大,也会溢写到 Reducer 所在节点的本地磁盘,最终合并成一个有序的数据集,供 reduce() 方法使用。

MapReduceTutorial.md 在描述 Reduce 端的 Shuffle 阶段时提到了这一点:

Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

##### ShuffleInput to the `Reducer` is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

总结

  • 联系: Mapper 和 Reducer 通过中间键值对联系起来。
  • Mapper 发送目标: Mapper 的输出目标由 Partitioner 决定,数据被写入本地磁盘的不同分区中。
  • 网络表示: 网络主要用于 Reduce 任务通过 HTTP 协议主动拉取各个 Mapper 节点上属于自己分区的数据。这是一个“拉”(Pull)模型,而不是“推”(Push)模型。

这种设计将计算(Map)和网络传输(Reduce Shuffle)解耦,并通过本地磁盘作为缓冲,提高了整个系统的鲁棒性和效率。

Mapper

Mapper 是 MapReduce 编程模型中至关重要的一环,它负责数据处理的第一个阶段,即“Map(映射)”阶段。它的核心职责是将输入的数据集(通常是原始数据)转换成一系列中间的键值对(Key-Value pairs),为后续的“Reduce(规约)”阶段做准备。

下面我们从用法、核心方法、执行流程以及与框架的交互等几个方面来深入分析。

Mapper 的基本用法

正如其 Javadoc 注释和类定义所示,开发者通常不直接实例化 Mapper 类,而是通过继承它来创建自己的 Mapper 实现。

// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ... existing code ...

Mapper 是一个泛型类,它有四个泛型参数,分别定义了输入和输出的键值对类型:

  • KEYIN: 输入键(Key)的数据类型。例如,LongWritable,通常表示文件中一行的偏移量。
  • VALUEIN: 输入值(Value)的数据类型。例如,Text,通常表示文件中的一行文本。
  • KEYOUT: 中间输出键的数据类型。由你的业务逻辑决定,例如在单词计数的例子中是 Text(单词)。
  • VALUEOUT: 中间输出值的数据类型。同样由业务逻辑决定,例如在单词计数的例子中是 IntWritable(计数值 1)。

开发者最主要的工作就是重写 Mapper 中的一个或多个核心方法来实现具体的业务逻辑:

  • setup(): 可选。在任务开始前执行一次,用于初始化。
  • map()必须。 对输入的每个键值对执行一次,是 Mapper 的核心逻辑所在。
  • cleanup(): 可选。在任务结束后执行一次,用于清理资源。

代码注释中提供了一个经典的“单词计数”(Word Count)示例,非常清晰地展示了如何使用 Mapper

public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one); // 输出 <单词, 1>}}
}

在这个例子中:

  1. TokenCounterMapper 继承了 Mapper<Object, Text, Text, IntWritable>
  2. 输入是 <Object, Text>,即行的偏移量和行的内容。
  3. 输出是 <Text, IntWritable>,即单词和数字 1。
  4. map 方法将一行文本 (value) 切分成多个单词,然后对每个单词,通过 context.write() 方法输出一个 <单词, 1> 的键值对。

核心方法与执行流程剖析

Mapper 的执行由其生命周期方法 run() 控制。这个方法定义了一个标准的执行流程模板。

// ... existing code .../*** Called once at the beginning of the task.*/protected void setup(Context context) throws IOException, InterruptedException {// NOTHING}/*** Called once for each key/value pair in the input split. Most applications* should override this, but the default is the identity function.*/@SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {context.write((KEYOUT) key, (VALUEOUT) value);}/*** Called once at the end of the task.*/protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING}/*** Expert users can override this method for more complete control over the* execution of the Mapper.* @param context* @throws IOException*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}}
}
  • setup(Context context):

    • 调用时机: 在 run 方法中,map 循环开始之前被调用一次。
    • 作用: 执行初始化工作。例如,在 TokenCounterMapper 示例中,可以在 setup 方法里初始化 one 和 word 对象,避免在 map 方法中重复创建,提高性能。
  • map(KEYIN key, VALUEIN value, Context context):

    • 调用时机: 在 run 方法的 while 循环中,每当 context.nextKeyValue() 返回 true 时被调用。框架会为输入分片(InputSplit)中的每一条记录调用一次此方法。
    • 作用: 这是 Mapper 的心脏,用于实现核心的转换逻辑。
    • 默认实现Mapper 提供了一个默认的 map 实现:context.write((KEYOUT) key, (VALUEOUT) value);。这是一个“恒等映射”,即将输入原封不动地作为输出。这在某些场景下很有用,比如当 MapReduce 作业只用于排序或数据格式转换时。
  • cleanup(Context context):

    • 调用时机: 在 run 方法的 finally 块中,确保在任务(无论是正常完成还是异常退出)的最后阶段被调用一次。
    • 作用: 执行清理工作,如关闭在 setup 中打开的文件句柄或网络连接,确保资源被正确释放。
  • run(Context context):

    • 作用: 这是驱动整个 Mapper 任务执行的模板方法。它定义了 Mapper 的标准生命周期:setup -> map (循环) -> cleanup
    • 执行流程:
      1. 调用 setup()
      2. 进入 try 块,循环调用 context.nextKeyValue() 来读取新的键值对。
      3. 如果读取成功,则调用 map() 方法处理该键值对。
      4. 循环结束后(或发生异常时),finally 块保证 cleanup() 方法一定会被执行。
    • 高级定制: 如注释所说,高级用户可以重写 run 方法来实现更复杂的逻辑,例如启动多个线程来并行处理 map 任务,以充分利用多核 CPU 资源。

Mapper.Context 的作用

Context 对象是 Mapper 与 Hadoop 框架之间沟通的桥梁。它是一个在 Mapper 类内部定义的抽象类,框架会为其提供一个具体的实现。Context 为 Mapper 提供了执行所需的所有上下文信息和功能。

// ... existing code ...
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {/*** The <code>Context</code> passed on to the {@link Mapper} implementations.*/public abstract class Contextimplements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
// ... existing code ...

它的主要功能包括:

  • 获取输入context.getCurrentKey() 和 context.getCurrentValue()
  • 迭代控制context.nextKeyValue(),用于在 run 方法中驱动循环。
  • 输出结果context.write(KEYOUT key, VALUEOUT value),将中间键值对写入缓冲区。
  • 获取配置context.getConfiguration(),可以访问作业的 Configuration 对象,读取配置信息。
  • 报告进度和状态context.setStatus() 和 context.progress()
  • 使用计数器context.getCounter(),用于统计自定义的指标,方便调试和监控。

Mapper 与框架的交互

一个 Mapper 实例并非独立运行,而是作为整个 MapReduce 作业流的一部分,与框架的其他组件紧密协作:

  1. 输入: 框架通过 InputFormat 读取数据源,将其切分为多个 InputSplit。每个 InputSplit 会被分配给一个 Map Task 来处理。
  2. 分区 (Partitioning)Mapper 输出的中间键值对会经过 PartitionerPartitioner 根据 KEYOUT 决定这条记录应该发送给哪个 Reducer。默认的分区器是 HashPartitioner,它通过计算 key 的哈希值来分配。
  3. 排序 (Sorting): 在写入磁盘和传给 Reducer 之前,框架会对 Mapper 的输出在缓冲区内按 KEYOUT 进行排序。
  4. 合并 (Combining): 用户可以指定一个 Combiner。它在 Mapper 端对排序后的输出进行一次本地的“预聚合”(mini-reduce)。这可以极大地减少从 Mapper 传输到 Reducer 的数据量,是 MapReduce 中一个非常重要的性能优化点。
  5. 零 Reducer: 如果作业设置的 Reducer 数量为 0,那么 Mapper 的输出将不会经过分区、排序和聚合,而是直接由 OutputFormat 写入到最终的输出文件中。这种作业被称为 "Map-Only Job"。

Reducer

与 Mapper 类似,开发者通过继承 Reducer 抽象类并重写其方法来定义自己的业务逻辑。

// ... existing code ...
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
// ... existing code ...

Reducer 类同样有四个泛型参数,定义了其输入和输出的键值类型:

  • KEYIN: 输入键的类型。这个类型必须与 Mapper 输出的键类型 KEYOUT 一致。
  • VALUEIN: 输入值的类型。这个类型必须与 Mapper 输出的值类型 VALUEOUT 一致。
  • KEYOUT: 最终输出结果的键类型。
  • VALUEOUT: 最终输出结果的值类型。

Javadoc 中提供了一个非常经典的 IntSumReducer 示例,用于对 Mapper 产生的数字进行求和。这在单词计数的场景中非常常见。

// ... existing code ...* <p>Example:</p>* <p><blockquote><pre>* public class IntSumReducer<Key> extends Reducer<Key,IntWritable,*                                                 Key,IntWritable>; {*   private IntWritable result = new IntWritable();* *   public void reduce(Key key, Iterable<IntWritable> values,*                      Context context) throws IOException, InterruptedException {*     int sum = 0;*     for (IntWritable val : values) {*       sum += val.get();*     }*     result.set(sum);*     context.write(key, result);*   }* }* </pre></blockquote>
// ... existing code ...

在这个例子中:

  1. IntSumReducer 继承了 Reducer
  2. 输入是 <Key, IntWritable>,例如 <"hello", 1>
  3. 输出是 <Key, IntWritable>,例如 <"hello", 100>
  4. reduce 方法接收一个键(如 "hello")和与该键关联的所有值的集合(一个包含很多 1 的 Iterable)。它遍历这个集合,将所有值累加起来,最后通过 context.write() 输出最终的键和总和。

Reducer 的核心阶段

Reducer 的工作过程比 Mapper 要复杂一些,其 Javadoc 中明确地将其划分为三个主要阶段:Shuffle、Sort 和 Reduce。

a. Shuffle (混洗)
  • 作用: 这是 Reduce 任务的第一个阶段。在此阶段,Reducer 任务通过网络(HTTP)从所有已完成的 Mapper 任务中拉取(copy)属于自己的那部分中间输出数据。
  • 分区 (Partitioning)Mapper 的输出在发送前会经过 PartitionerPartitioner 会根据 key 计算出一个分区号,确保所有相同的 key 会被发送到同一个 Reducer 任务。这就是为什么 Reducer 能收到一个 key 对应的所有 values。
b. Sort (排序)
  • 作用: 当 Reducer 任务拉取数据时,框架会在后台对这些数据进行归并排序(merge sort)。这个排序是根据键 (KEYIN) 来进行的。
  • 分组 (Grouping): 排序的最终目的是为了分组。排序完成后,所有具有相同 key 的 value 自然地聚集在一起,形成一个 <key, (list of values)> 的结构,这正是 reduce 方法的输入形式。
  • Shuffle 和 Sort 的并发: Javadoc 中提到,这两个阶段是同时进行的。也就是说,框架一边从 Mapper 拉取数据,一边就在内存和磁盘上进行归并排序,以提高效率。
c. Reduce (规约)
  • 作用: 这是 Reducer 的核心阶段,也是用户定义业务逻辑的地方。
  • 调用: 当 Shuffle 和 Sort 阶段完成后,框架会开始遍历排好序的键。对于每一个唯一的键及其关联的值列表,框架会调用一次用户实现的 reduce 方法。
  • 输出: 在 reduce 方法中,用户通过 Context.write(Object, Object) 将最终结果写入到输出文件系统(如 HDFS)。注意:Reducer 的输出是不会被再次排序的。

核心方法与执行流程

Reducer 的生命周期由其 run 方法控制,这个方法定义了一个标准的执行模板。

// ... existing code .../*** This method is called once for each key. Most applications will define* their reduce class by overriding this method. The default implementation* is an identity function.*/@SuppressWarnings("unchecked")protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}
// ... existing code .../*** Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to* control how the reduce task works.*/public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}
}
  • setup(Context context): 在任务开始时调用一次,用于初始化操作。
  • reduce(KEYIN key, Iterable<VALUEIN> values, Context context):
    • 调用时机: 在 run 方法的 while 循环中,每当 context.nextKey() 成功移动到下一个唯一的 key 时,该方法被调用一次。
    • 参数key 是当前处理的键,values 是一个迭代器,包含了所有与该 key 关联的值。
    • 默认实现: 默认的 reduce 方法是一个恒等函数,它会遍历 values 迭代器,并将每个 <key, value> 对原样输出。
  • cleanup(Context context): 在任务结束时调用一次,用于资源清理。
  • run(Context context):
    • 执行流程:
      1. 调用 setup()
      2. 进入 try 块,循环调用 context.nextKey() 来遍历所有唯一的、排好序的键。
      3. 如果 nextKey() 返回 true,则调用 reduce() 方法,并传入当前的键 context.getCurrentKey() 和对应的值迭代器 context.getValues()
      4. 循环结束后(或发生异常),finally 块保证 cleanup() 方法一定会被执行。

Reducer.Context 的作用

Context 对象是 Reducer 与 Hadoop 框架沟通的桥梁,它提供了 Reducer 运行所需的所有上下文信息和功能。

  • 迭代控制context.nextKey() 用于驱动 run 方法的循环,移动到下一个键。
  • 获取输入context.getCurrentKey() 获取当前键,context.getValues() 获取与当前键关联的值的 Iterable
  • 输出结果context.write(KEYOUT key, VALUEOUT value) 将最终结果写入输出。
  • 其他功能: 与 Mapper.Context 类似,它也提供了获取配置 (getConfiguration)、获取计数器 (getCounter)、报告状态 (setStatus) 等功能。

二次排序 (Secondary Sort)

Javadoc 中还提到了一个高级用法:二次排序。 当我们需要对同一个 key 对应的 values 列表也进行排序时,就需要用到二次排序。 实现方式是:

  1. 自定义组合键: 创建一个包含主键和次键的自定义 WritableComparable 类。
  2. 自定义排序比较器Job.setSortComparatorClass()。让框架在排序阶段按照整个组合键(主键+次键)进行排序。
  3. 自定义分组比较器Job.setGroupingComparatorClass()。这个比较器告诉框架,哪些键应该被分到同一个 reduce 调用中。它只比较组合键中的主键部分。

这样,框架在排序时会考虑次键,但在调用 reduce 方法时,会将主键相同的所有记录(尽管次键不同)都分到同一次 reduce 调用中,而此时 values 迭代器中的值就是按照次键排好序的了。

Partitioner

Partitioner(分区器)是 MapReduce 流程中一个非常关键的组件。它的核心职责是在 Map 阶段结束、Reduce 阶段开始之前,决定 Mapper 输出的每一个键值对(key-value pair)应该被发送到哪一个 Reducer 任务去处理

Partitioner 是一个抽象类,我们通常必须通过继承它来创建自定义的分区逻辑。

我们来看一下它的定义:

// ... existing code ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {/** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job.*   * <p>Typically a hash function on a all or a subset of the key.</p>** @param key the key to be partioned.* @param value the entry value.* @param numPartitions the total number of partitions.* @return the partition number for the <code>key</code>.*/public abstract int getPartition(KEY key, VALUE value, int numPartitions);}

从代码中可以看出:

  1. 泛型: 它有两个泛型参数 <KEY, VALUE>,这对应了 Mapper 输出的键值类型。
  2. 核心抽象方法: 它只定义了一个核心的抽象方法 getPartition。任何子类都必须实现这个方法。

getPartition 方法详解

public abstract int getPartition(KEY key, VALUE value, int numPartitions);

  • keyMapper 输出的键。
  • valueMapper 输出的值。
  • numPartitions: 分区的总数,这个值等于你为这个 Job 设置的 Reducer 任务的数量 (job.setNumReduceTasks(int))。
  • 返回值: 一个整数,范围必须是 0 到 numPartitions - 1。这个返回值就是分区号,它直接决定了这条记录会被发送到哪个 Reducer(例如,返回 0 就发送给第一个 Reducer,返回 1 就发送给第二个,以此类推)。

注意: 正如 Javadoc 中提到的,只有当你设置的 Reducer 数量大于 1 时,Partitioner 才会被创建和使用。如果只有一个 Reducer 或者没有 Reducer,分区是没有意义的。

默认的 Partitioner: HashPartitioner

在你的工程中,如果你不通过 job.setPartitionerClass(...) 来指定一个自定义的 Partitioner,Hadoop 会使用默认的 HashPartitioner。从MapReduceTutorial.md 中也可以看到这一点。

HashPartitioner 的逻辑非常简单:

  1. 获取 key 的 hashCode()
  2. 用一个很大的正数(Integer.MAX_VALUE)进行按位与操作,确保结果为正。
  3. 用 numPartitions (Reducer 的数量) 取模。
// HashPartitioner 的核心逻辑伪代码
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;

这种默认方式在大多数情况下能很好地工作,它可以相对均匀地将键分散到不同的 Reducer 中,实现负载均衡。

为什么要自定义 Partitioner?

既然有默认的实现,为什么我们还需要自定义呢?主要有以下几个原因:

  1. 数据倾斜 (Data Skew): 默认的 hashCode 方法可能无法均匀地分布你的特定数据集。例如,某些 key 的哈希值可能恰好都聚集在少数几个结果上,导致少数 Reducer 任务过重,而其他 Reducer 很空闲,拖慢整个作业的执行效率。通过自定义分区逻辑,可以根据数据特点进行更均匀的分配。

  2. 业务逻辑要求: 有些业务场景要求具有相同特征的 key 被发送到同一个 Reducer。例如,假设你正在处理订单数据,你可能希望所有来自同一个省份的订单都由同一个 Reducer 处理。这时,你可以自定义一个 Partitioner,它不根据整个 key(可能是订单 ID)来分区,而是根据 key 对象中的“省份”字段来分区。

如何实现自定义 Partitioner

TeraSort.java 提供了一个很好的例子:

// ... existing code .../*** A total order partitioner that assigns keys based on their first * PREFIX_LENGTH bytes, assuming a flat distribution.*/public static class SimplePartitioner extends Partitioner<Text, Text>implements Configurable {int prefixesPerReduce;private static final int PREFIX_LENGTH = 3;private Configuration conf = null;public void setConf(Configuration conf) {this.conf = conf;prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));}// ... existing code ...

这个 SimplePartitioner 继承了 Partitioner,并且还实现了 Configurable 接口。实现 Configurable 接口是为了让 Partitioner 能够获取到 Job 的 Configuration 对象,从而读取一些配置信息(比如这里的 NUM_REDUCES)。

高级 Partitioner: TotalOrderPartitioner

还存在一个更高级的实现:TotalOrderPartitioner。它用于实现全局排序。普通的 MapReduce 作业只能保证 Reducer 的输出在各自的文件内部是有序的,但 Reducer 0 的输出和 Reducer 1 的输出之间没有顺序关系。

TotalOrderPartitioner 通过读取一个预先生成的分区文件(包含了 key 的分割点),来保证所有被发送到 Reducer i 的 key 都小于被发送到 Reducer i+1 的 key。这样,当所有 Reducer 完成后,将它们的输出文件按顺序拼接起来,就得到了一个全局有序的大文件。

总结

  • Partitioner 是连接 Map 和 Reduce 阶段的桥梁,负责数据分发。
  • 它是一个抽象类,你必须继承它并实现 getPartition 方法来创建自定义分区器。
  • Hadoop 默认使用 HashPartitioner,它基于 key 的哈希值进行分区,适用于大多数场景。
  • 当需要解决数据倾斜或根据业务逻辑对数据进行分组时,就需要自定义 Partitioner
  • 更高级的 TotalOrderPartitioner 可以用来实现输出结果的全局排序。

http://www.xdnf.cn/news/17608.html

相关文章:

  • LeetCode - 搜索插入位置 / 排序链表
  • (LeetCode 面试经典 150 题) 104. 二叉树的最大深度 (深度优先搜索dfs)
  • 【Docker实战入门】从核心概念到镜像构建
  • JavaScript的 fetch() 方法 笔记250810
  • CSS优先级、HTTP响应状态码
  • Android的事件分发流程、Kotlin协程、4大组件、Handler机制、架构设计、性能优化、内存泄漏
  • 第4章 程序段的反复执行2while语句P128练习题(题及答案)
  • 智慧农业-无人机视角庄稼倒伏农作物倒伏检测数据集VOC+YOLO格式541张1类别
  • VSCode添加Python、Java注释技巧、模板
  • 疏老师-python训练营-Day40训练和测试的规范写法
  • NumPy性能飞跃秘籍:向量化计算如何提升400倍运算效率?
  • istio笔记03--快速上手多集群mesh
  • 【C语言】深入探索预处理
  • Matlab 基于BP神经网络结合Bagging(BP-Bagging)集成算法的单变量时序预测 (单输入单输出)
  • 带冷端补偿的热电偶采集方案MAX31855
  • Dell PowerEdge: Servers by generation (按代系划分的服务器)
  • 【渲染流水线】[几何阶段]-[图元装配]以UnityURP为例
  • C++2024 年一级
  • Cursor设置
  • 【机器学习深度学习】模型选型:如何根据现有设备选择合适的训练模型
  • 【面试场景题】微博热点新闻系统设计方案
  • 一个“加锁无效“的诡异现象
  • #C语言——刷题攻略:牛客编程入门训练(七):分支控制(一)-- 涉及 %c前加空格:忽略起首的空白字符
  • Spring Boot Starter 自动化配置原理深度剖析
  • 把大模型“关进冰箱”——基于知识蒸馏 + 动态量化的小型化实战笔记
  • 推客系统开发全攻略:从架构设计到高并发实战
  • 【Python 高频 API 速学 ②】
  • 让大模型 “睡觉”:把版本迭代当作人类睡眠来设计(附可直接改造的训练作息表与代码)
  • 【Task2】【Datawhale AI夏令营】多模态RAG
  • Python基础教程(四)字符串和编码:深度探索Python字符串与编码的终极指南