当前位置: 首页 > news >正文

Hadoop MapReduce Task 设计源码分析

Task

Task 是 Hadoop MapReduce 框架中一个非常核心的抽象基类,它代表了将被执行的工作单元。无论是 Map 任务还是 Reduce 任务,它们都有共同的属性和行为,这些都被抽象到了 Task 类中。首先,我们来看一下 Task 类的声明:

// ... existing code ...
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
abstract public class Task implements Writable, Configurable {
// ... existing code ...
  • implements WritableWritable 是 Hadoop 的序列化接口。实现这个接口意味着 Task 对象可以被序列化成字节流,在网络中进行传输(例如,从 ApplicationMaster 发送给将要执行该任务的 NodeManager),或者持久化到磁盘。这对于分布式计算至关重要。Task 类中的 write 和 readFields 方法就是对这个接口的实现。
    // ... existing code ...public void write(DataOutput out) throws IOException {Text.writeString(out, jobFile);taskId.write(out);out.writeInt(partition);// ... more fields}public void readFields(DataInput in) throws IOException {jobFile = StringInterner.weakIntern(Text.readString(in));taskId = TaskAttemptID.read(in);partition = in.readInt();// ... more fields}
    // ... existing code ...
    
  • implements Configurable: 这个接口表明 Task 对象可以接收一个配置对象(Configuration),从而获取作业的所有配置信息。

角色Task 类在 MapReduce 框架中扮演着单个任务执行实例的描述者和执行者的角色。它封装了任务运行所需的所有信息,包括任务ID、配置、状态、输入输出等,并定义了任务执行的生命周期和与框架其他部分的交互方式。


核心属性(Fields)分析

Task 类包含了大量字段,用于维护任务的完整上下文。以下是一些最重要的属性:

  • 任务标识:

    • jobFile: 作业的配置文件路径。
    • taskIdTaskAttemptID 类型,这是任务某次尝试的唯一标识符,例如 attempt_1400206191003_0001_m_000000_0。它包含了 JobID、TaskID 和一个尝试编号。
    • partition: 任务在其所属的作业中的分区号。对于 Map 任务,这个值通常是输入分片(InputSplit)的索引;对于 Reduce 任务,它是 Reduce 分区的索引(0 到 numReduceTasks-1)。
  • 状态与阶段:

    • taskStatusTaskStatus 类型,一个非常重要的对象,用于封装任务的当前状态,包括:
      • runState: 运行状态,如 UNASSIGNEDRUNNINGSUCCEEDEDFAILED
      • phase: 任务所处的阶段。Map 任务有 MAP 阶段,Reduce 任务有 SHUFFLESORTREDUCE 阶段。
      • progress: 任务的完成进度(0.0f 到 1.0f)。
      • stateString: 对当前状态的文本描述。
    • jobSetupjobCleanuptaskCleanup: 这些布尔标志位用于区分任务的类型。一个 MapReduce 作业不仅包含 Map 和 Reduce 任务,还包括:
      • Job Setup Task: 在所有 Map/Reduce 任务开始前运行,用于作业级别的初始化。
      • Job Cleanup Task: 在所有任务成功完成后运行,用于作业级别的清理和收尾工作。
      • Task Cleanup Task: 在每次任务尝试(无论成功或失败)后运行,用于清理该次尝试产生的临时数据。
  • 通信与交互:

    • umbilicalTaskUmbilicalProtocol 接口。这是任务(子进程)与它的父进程(TaskTracker 或 YARN 中的 YarnChild)进行通信的“脐带”。任务通过它来汇报心跳、更新状态、报告进度和发送致命错误。
    • tokenSecretshuffleSecret: 用于安全认证的密钥,确保数据传输(如 Shuffle 过程)的安全性。
  • 数据处理:

    • committerorg.apache.hadoop.mapreduce.OutputCommitter 类型。它负责管理任务的输出。这是一个关键组件,控制着任务输出的生命周期,包括:
      1. setupTask: 任务开始前的初始化(如创建临时输出目录)。
      2. commitTask: 任务成功后,将临时输出提交到最终位置。
      3. abortTask: 任务失败后,清理临时输出。
    • skipRangesSortedRanges 类型。用于处理“坏记录”。如果一个任务因为某些输入记录而失败,框架可以配置在下次尝试时跳过这些坏记录的范围。

核心方法分析

Task 类定义了任务执行的骨架,其核心方法控制着任务的生命周期。

  • run(JobConf job, TaskUmbilicalProtocol umbilical): 这是 Task 类最核心的抽象方法。任务的主要逻辑在这里被触发。当一个任务被分配到某个节点上并在一个新的 JVM 中启动后,框架就会调用这个 run 方法。MapTask 和 ReduceTask 会分别提供这个方法的具体实现,来执行它们各自的 Map 或 Reduce 逻辑。

    // ... existing code ...public class StubTask extends Task {@Overridepublic void run(JobConf job, TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {// nop}
    // ... existing code ...
    
  • initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi): 在 run 方法内部,initialize 会被首先调用,用于准备任务的运行环境。主要工作包括:

    1. 创建 JobContext 和 TaskAttemptContext,为任务提供配置和运行时信息的访问接口。
    2. 根据配置实例化 OutputFormat 和 OutputCommitter
    3. 设置任务的临时工作输出路径。
    4. 初始化资源监控工具 ResourceCalculatorProcessTree,用于监控任务的 CPU 和内存使用情况。
  • localizeConfiguration(JobConf conf): 这个方法负责将作业的全局配置(JobConf)“本地化”,为当前任务实例注入特定的上下文信息。

    // ... existing code ...public void localizeConfiguration(JobConf conf) throws IOException {conf.set(JobContext.TASK_ID, taskId.getTaskID().toString()); conf.set(JobContext.TASK_ATTEMPT_ID, taskId.toString());conf.setBoolean(JobContext.TASK_ISMAP, isMapTask());conf.setInt(JobContext.TASK_PARTITION, partition);conf.set(JobContext.ID, taskId.getJobID().toString());}
    // ... existing code ...
    

    这样,用户代码就可以在 map 或 reduce 方法中通过 Context 对象获取到当前任务的 ID、分区号等信息。

  • reportFatalError(...): 当任务遇到不可恢复的严重错误时(例如 OOM),它会调用此方法。该方法通过 umbilical 协议将错误信息发送给父进程,父进程随后会标记此次任务尝试为 FAILED,并可能决定是否要重新调度一次新的尝试。

  • 状态管理方法 (setPhasesetState): 这些方法用于更新 taskStatus。它们被声明为 synchronized,因为任务的主执行线程和与父进程通信的心跳线程(TaskReporter)可能会并发地访问和修改任务状态。同步确保了状态的一致性。


总结

Task 类是 Hadoop MapReduce 任务执行模型的基础。它通过抽象和封装,完美地定义了一个分布式计算任务单元所需的通用能力:

  1. 身份与配置:每个任务都有唯一的标识和独立的配置上下文。
  2. 生命周期管理:定义了从初始化、执行到提交/中止的完整流程。
  3. 状态与进度跟踪:通过 TaskStatus 和 Counters 精确地度量任务的执行情况。
  4. 通信机制:通过 TaskUmbilicalProtocol 与集群管理框架(YARN ApplicationMaster)保持联系。
  5. 可扩展性:作为一个抽象类,它将通用的任务管理逻辑与具体的业务逻辑(Map 或 Reduce)解耦,使得框架易于维护和扩展。

MapTask

MapTask 继承自 Task,它的核心职责可以概括为以下几步:

  1. 读取输入:从 InputFormat 获取一个输入分片(InputSplit),并使用 RecordReader 逐条读取 <key, value> 记录。
  2. 执行 Map:对读取的每一条记录,调用用户自定义的 Mapper 的 map 方法,产生零个或多个中间 <key, value> 对。
  3. 收集与处理输出:这是 MapTask 最复杂、最核心的部分。它并不会直接将 Mapper 的输出写入磁盘,而是进行一系列高度优化的处理,包括 分区(Partitioning)排序(Sorting)溢写(Spilling) 和 合并(Merging)
  4. 提供输出:最终生成一个有序的、按 Reduce 任务分区的输出文件,存放在本地磁盘上,等待 Reduce 任务来拉取(Shuffle)。

MapTask 的整个执行流程由其 run 方法驱动:

// ... existing code ...@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;if (isMapTask()) {// 如果有 Reducer,Map 阶段占 66.7% 进度,Sort 阶段占 33.3%if (conf.getNumReduceTasks() == 0) {mapPhase = getProgress().addPhase("map", 1.0f);} else {mapPhase = getProgress().addPhase("map", 0.667f);sortPhase  = getProgress().addPhase("sort", 0.333f);}}TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewMapper();initialize(job, getJobID(), reporter, useNewApi);// ... 处理 job-setup, job-cleanup, task-cleanup 等特殊任务 ...if (useNewApi) {runNewMapper(job, splitMetaInfo, umbilical, reporter);} else {runOldMapper(job, splitMetaInfo, umbilical, reporter);}done(umbilical, reporter);}
// ... existing code ...

从 run 方法可以看出,它首先设置了任务的进度阶段(mapPhase 和 sortPhase),然后根据用户使用的是新版 API 还是旧版 API,分别调用 runNewMapper 或 runOldMapper 来执行核心逻辑。


输入处理:TrackedRecordReader 和 SkippingRecordReader

MapTask 内部定义了两个重要的 RecordReader 包装类,用于在读取数据的同时进行监控和容错。

  • TrackedRecordReader: 这是一个装饰器(Decorator),它包装了用户提供的原始 RecordReader。它的主要作用是在每次调用 next() 方法读取记录时,自动更新相关的计数器(如 MAP_INPUT_RECORDS 和 FileInputFormatCounter.BYTES_READ)并向 ApplicationMaster 报告进度。这使得框架能够实时监控 Map 任务的输入进度。

    // ... existing code ...
    class TrackedRecordReader<K, V> implements RecordReader<K, V> {private RecordReader<K, V> rawIn;private Counters.Counter fileInputByteCounter;private Counters.Counter inputRecordCounter;// ...public synchronized boolean next(K key, V value)throws IOException {boolean ret = moveToNext(key, value);if (ret) {incrCounters(); // 增加记录计数}return ret;}protected synchronized boolean moveToNext(K key, V value)throws IOException {// ... 记录读取前后的文件系统字节数,并更新计数器 ...fileInputByteCounter.increment(bytesInCurr - bytesInPrev);reporter.setProgress(getProgress()); // 报告进度return ret;}
    // ... existing code ...
    
  • SkippingRecordReader: 这个类继承自 TrackedRecordReader,增加了跳过“坏记录”的功能。如果一个 MapTask 在之前的尝试中因为处理某条记录而出错,框架可以配置在重试时跳过这些记录。SkippingRecordReader 通过 skipIt 迭代器来确定哪些记录索引需要被跳过,并可以将这些被跳过的记录写入一个专门的文件,供后续分析。这是一种重要的容错机制。

    // ... existing code ...
    class SkippingRecordReader<K, V> extends TrackedRecordReader<K, V> {private SkipRangeIterator skipIt;// ...public synchronized boolean next(K key, V value)throws IOException {if(!skipIt.hasNext()) {LOG.warn("Further records got skipped.");return false;}boolean ret = moveToNext(key, value);long nextRecIndex = skipIt.next();long skip = 0;while(recIndex<nextRecIndex && ret) { // 如果当前记录索引在需要跳过的范围内if(toWriteSkipRecs) {writeSkippedRec(key, value); // 写入跳过的记录}ret = moveToNext(key, value); // 继续读取下一条,实现跳过skip++;}// ...skipRecCounter.increment(skip); // 更新跳过记录的计数器// ...return ret;}
    // ... existing code ...
    

环形缓冲区与溢写(Spill)机制

这是 MapTask 最精华的部分。Mapper 的输出数据不会被立即写入最终文件,而是先被收集到一个内存中的环形缓冲区(Circular Buffer)。当缓冲区的数据达到一定阈值时,会启动一个后台线程将数据进行 排序(Sort) 并 溢写(Spill) 到本地磁盘上的一个临时文件中。这个过程可能发生多次。

这个复杂的过程主要由 MapTask 的一个内部类 MapOutputBuffer(它是 MapOutputCollector 的默认实现)来管理。

工作流程如下:

  1. 收集(Collect)Mapper 每输出一个 <key, value> 对,就会调用 collector.collect(key, value)MapOutputBuffer 会将这个键值对序列化,并存入环形字节数组 kvbuffer 中。同时,它会在一个元数据数组 kvmeta 中记录下这条数据的信息,包括它所属的分区(Partition)、key 的起始位置和 value 的起始位置。

  2. 触发溢写(Spill Trigger): 当 kvbuffer 中已使用的空间超过一个阈值(由 mapreduce.map.sort.spill.percent 配置,默认为 80%)时,溢写过程就会被触发。

  3. 排序(Sort): 在溢写到磁盘之前,MapOutputBuffer 会对缓冲区中的所有数据进行排序。这是一个内存中的排序。排序是基于元数据 kvmeta 进行的,避免了移动庞大的序列化数据。排序规则是:首先按分区号排序,然后分区内部按 key 排序。

  4. 合并(Combine, 可选): 如果用户配置了 Combiner,那么在排序之后、写入磁盘之前,会对每个分区内的数据执行 Combine 操作。Combiner 本质上是一个小型的 Reducer,它能有效减少写入磁盘和后续网络传输的数据量。

  5. 溢写(Spill): 经过排序和(可选的)合并后,数据被顺序写入一个本地磁盘上的溢写文件(spill file)。同时,会生成一个对应的索引文件,记录每个分区数据在这个溢写文件中的偏移量、原始长度和压缩后长度。

Paimon 内存写入后刷写SST到磁盘类似这个设计,详细分析见:

Paimon MemStore写入本地磁盘前的Sort:内存快排

外存归并见:

Paimon MemStore写入本地磁盘前的Sort:外存归并

这个过程的代码逻辑非常复杂,分布在 MapOutputBuffer 的 collectsortAndSpill 等方法中。

// ... existing code ...private void sortAndSpill() throws IOException, ClassNotFoundException,InterruptedException {// ...try {// ...final SpillRecord spillRec = new SpillRecord(partitions);final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);out = rfs.create(filename);// ...// 调用 sorter 对 kvmeta 中的元数据进行排序sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);// ...for (int i = 0; i < partitions; ++i) {IFile.Writer<K, V> writer = null;try {// ...if (combinerRunner == null) {// 如果没有 Combiner,直接将排序后的数据写入溢写文件// ...} else {// 如果有 Combiner,先执行 combine 操作,再写入combineCollector.setWriter(writer);RawKeyValueIterator kvIter =new MRResultIterator(spstart, spindex);combinerRunner.combine(kvIter, combineCollector);}// ...// 记录每个分区的索引信息(偏移量、长度等)到 spillRecrec.startOffset = segmentStart;rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);spillRec.putIndex(rec, i);} finally {if (writer != null) writer.close();}}// ...// 将索引记录 spillRec 写入索引文件或缓存} // ...}
// ... existing code ...

最终合并(Merge)

当 Mapper 处理完所有输入记录,并且所有内存数据都已溢写到磁盘后,MapTask 的工作进入最后阶段:合并(Merge)

如果 MapTask 只产生了一个溢写文件,那么这个文件本身就成为最终的输出文件。但如果产生了多个溢写文件,就需要将它们合并成一个。

  • mergeParts(): 这个方法负责合并所有的溢写文件。
  • K-Way Merge: 它使用 Merger.merge() 方法对所有溢写文件执行一次多路归并排序。因为每个溢写文件内部已经按分区和 key 排好序,所以合并过程非常高效。
  • 最终输出: 合并的结果是一个最终的输出文件(如 file.out)和一个对应的总索引文件(如 file.out.index)。这个输出文件包含了所有分区的、全局有序的数据。总索引文件则精确地指明了每个 Reduce 任务应该从 file.out 的哪个位置开始拉取、拉取多长的数据。
 
// ... existing code ...private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {// ...if (numSpills == 1) { // 如果只有一个溢写文件,直接重命名// ...return;}// 获取所有溢写文件for(int i = 0; i < numSpills; i++) {filename[i] = mapOutputFile.getSpillFile(i);// ...}// ...// 对所有分区进行迭代for (int parts = 0; parts < partitions; parts++) {// ...// 从所有溢写文件中收集当前分区的段(Segment)List<Segment<K, V>> segmentList =new ArrayList<Segment<K, V>>(numSpills);// ...// 调用 Merger.merge 执行多路归并@SuppressWarnings("unchecked")RawKeyValueIterator kvIter = Merger.merge(job, rfs,keyClass, valClass, codec,segmentList, mergeFactor, /* ... */);// 将合并后的结果写入最终输出文件Merger.writeFile(kvIter, writer, reporter, job);// ...}// ...}
// ... existing code ...

总结

MapTask 是一个设计精巧、高度优化的数据处理引擎。它不仅仅是简单地执行用户的 map 函数,其真正的核心在于后台的排序-溢写-合并机制。通过环形内存缓冲区、多路归并排序等技术,它实现了一种高效的外部排序(External Sort),能够在有限的内存下处理海量的中间数据,并为后续的 Shuffle 和 Reduce 阶段准备好分区有序的数据。

理解 MapTask 的工作流程,特别是 MapOutputBuffer 的实现,是掌握 MapReduce 性能调优和框架原理的关键。

Merger 

Map 端: 当 MapTask 的环形缓冲区 MapOutputBuffer 多次溢写(Spill)到磁盘,产生了多个临时的溢写文件后,MapTask 在最后阶段会调用 Merger.mergeParts()。这个方法会使用 Merger 将所有属于同一个分区的溢写文件片段合并成一个最终的、分区内有序的输出文件。

Reduce 端: 在 Shuffle 阶段,Reduce 任务会从多个 Map 任务拉取属于自己的数据片段。这些片段本身是分区内有序的。Reduce 任务会使用 Merger 将这些来自不同 Map 主机的片段进行归并排序,形成一个单一的、全局有序的输入流,然后逐条送给用户的 reduce 函数处理。

Merger 的核心逻辑:多轮归并(Multi-pass Merge)

如果一次性把所有文件都合并成一个,会不会导致这个最终文件过大?或者说,合并过程本身会不会消耗太多资源(比如文件句柄)?

Hadoop 的设计者充分考虑了这一点。Merger 并非总是“一步到位”地将所有输入文件合并成一个。它采用了一种更健壮、更可扩展的策略——多轮归并(Multi-pass Merge),也叫多阶段合并。

这个逻辑主要在 Merger 的内部类 MergeQueue 的 merge 方法中实现。核心参数

  • mapreduce.task.io.sort.factor (在代码中通常是 mergeFactor 或 factor): 这个参数是控制合并行为的关键。它指定了每一轮合并最多可以同时打开多少个文件(或数据流)进行归并。默认值是 10。

工作流程

假设我们有 100 个溢写文件(segments),并且 mergeFactor 是 10。

  1. 第一轮合并Merger 不会一次打开 100 个文件。它会取前 10 个文件,将它们合并成一个新的、更大的中间文件(比如 intermediate.1)。然后,再取接下来的 10 个文件,合并成 intermediate.2,以此类推。当第一轮结束时,原来的 100 个小文件就变成了 10 个中等大小的文件。

  2. 第二轮合并:现在,Merger 会将这 10 个中等文件作为输入,再次进行合并。由于 10 不超过 mergeFactor (10),这一轮就可以将它们全部合并成一个最终的输出文件。

  3. 最终结果:经过两轮合并,最初的 100 个文件最终被合并成了一个单一的、全局有序的文件。

这个过程就像一个锦标赛,每次从一组选手中选出优胜者,然后优胜者之间再进行比赛,直到决出总冠军。让我们看一下 MergeQueue.merge 方法中的关键代码片段:

// ... existing code ...public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,int factor, int inMem,Path tmpDir,Counters.Counter readsCounter,Counters.Counter writesCounter,Progress mergePhase)throws IOException {// ...// 循环进行多轮合并do {// 获取本轮合并的 factorfactor = getPassFactor(factor, passNo, numSegments - inMem);// ...// 从所有 segments 中,取出 factor 个来进行合并List<Segment<K, V>> segmentsToMerge =new ArrayList<Segment<K, V>>();// ...// 如果合并后的 segment 数量仍然大于 factor,说明还需要下一轮合并if (numSegments <= factor) {// 这是最后一轮合并,直接返回一个迭代器,让调用者(如 Reduce aTask)可以逐条读取最终结果// ...LOG.info("Down to the last merge-pass, with " + numSegments + " segments left...");return this;} else {// 不是最后一轮,需要将合并结果写入一个新的临时文件LOG.info("Merging " + segmentsToMerge.size() + " intermediate segments...");// ... 创建临时输出文件 ...Path outputFile =  lDirAlloc.getLocalPathForWrite(...);Writer<K, V> writer = new Writer<K, V>(...);// 将本轮合并的结果写入临时文件writeFile(this, writer, reporter, conf);writer.close();// 将新生成的临时文件(作为一个新的 Segment)添加回待合并列表,// 以便参与下一轮的合并Segment<K, V> tempSegment = new Segment<K, V>(conf, fs, outputFile, codec, false);segments.add(tempSegment);// ...passNo++;}} while(true);}
// ... existing code ...

对最终文件大小的处理

最终合并成的那个文件会不会太大了?

答案是:会,而且这是设计如此。

  • 在 Map 端MapTask 的最终输出就是一个文件(file.out)和一个索引文件(file.out.index)。这个 file.out 包含了这个 Map 任务产生的所有分区的数据。如果这个 Map 任务处理的数据量很大,这个文件也可能会很大。但这是本地磁盘上的文件,主要消耗的是磁盘空间,而不是内存。它的存在是为了让 Reduce 任务能通过一次网络连接就拉取到所有需要的数据,而不是对多个小文件进行多次网络请求。

  • 在 Reduce 端:Reduce 端的合并最终并不一定会产生一个巨大的中间文件。Merger 的 merge 方法在最后一轮合并时,并不会把结果写入新文件,而是返回一个 RawKeyValueIterator。Reduce 任务会直接从这个迭代器中逐条读取 <key, List<value>>,然后送入用户的 reduce 函数。reduce 函数处理完的结果,会通过 RecordWriter 直接写入 HDFS 上的最终输出路径。

总结一下

  1. Merger 不是一次性合并所有文件,而是通过 mapreduce.task.io.sort.factor 参数控制,进行多轮归并。这有效地控制了单次合并操作所需要打开的文件句柄数和内存消耗。

  2. Map 端的最终输出就是一个合并后的大文件,这是为了优化后续 Shuffle 阶段的网络传输效率。这个文件存储在本地磁盘,其大小受限于本地磁盘空间。

  3. Reduce 端的合并过程是流式的。最后一轮合并的结果是一个迭代器,数据流式地传递给 reduce 函数,并直接写入 HDFS,通常不会在本地磁盘生成一个包含所有数据的最终合并文件。这避免了在 Reduce 节点上产生双倍的磁盘空间开销。

因此,Hadoop 的 Merger 机制在处理大文件和大量文件时,通过多轮归并和流式处理,实现了资源消耗和执行效率之间的平衡。

ReduceTask

ReduceTask 的核心职责是执行 MapReduce 作业中的 Reduce 阶段。具体来说,它包含三个主要步骤:

  1. Shuffle (Copy): 从所有已完成的 MapTask 中拉取(fetch)属于自己这个分区(partition)的输出数据。
  2. Sort (Merge): 将从不同 Map 任务拉取来的数据片段进行归并排序。这些数据片段本身已经是(在分区内)有序的,所以这里是一个多路归并操作,最终形成一个全局有序的输入流。
  3. Reduce: 逐个处理排序后的 <key, list-of-values> 对,执行用户定义的 reduce 函数,并将最终结果写入输出文件系统(如 HDFS)。

开发者通常不会直接创建或调用 ReduceTask。它的整个生命周期由 Hadoop 框架(具体来说是 YARN 的 NodeManager 和 MRAppMaster)管理。

  1. 作业提交: 用户通过 Job.setNumReduceTasks(int) 设置 Reduce 任务的数量。如果数量大于0,MapReduce 作业就会包含 Reduce 阶段。
  2. 任务调度MRAppMaster 会根据作业的配置,为每个分区(从0到 numReduceTasks - 1)启动一个 ReduceTask
  3. 任务执行NodeManager 在某个工作节点上启动 ReduceTask 的 JVM 进程。ReduceTask 启动后,其 run 方法会被调用,从而开始它的 Shuffle -> Sort -> Reduce 的生命周期。

在 ReduceTask.java 的 run 方法中,我们可以清晰地看到这个流程的编排:

// ... existing code ...@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {// ... 初始化 ...if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}// ...// 1. Shuffle & Sort 阶段// 使用插件化的 ShuffleConsumerPlugin (默认为 Shuffle.class) 来执行Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);// ...shuffleConsumerPlugin.init(shuffleContext);// 调用 run() 方法,此方法会阻塞直到所有数据都已拉取并合并排序完成rIter = shuffleConsumerPlugin.run();// ...sortPhase.complete();                         // sort 阶段完成setPhase(TaskStatus.Phase.REDUCE);            // 更新任务状态为 REDUCE// ...// 2. Reduce 阶段// 根据用户配置,调用新版或旧版的 Reducer APIif (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}
// ... existing code ...

Shuffle & Sort (Copy & Merge)

这是 Reduce 任务最复杂也最关键的部分。从 run 方法中可以看到,这个过程被抽象成了 ShuffleConsumerPlugin 接口,默认实现是 Shuffle 类。这个阶段的目标是从各个 Map 任务获取数据,并在 Reduce 端将它们合并成一个有序的输入流。

Shuffle (Copy) 阶段:

  • ReduceTask 会启动多个 "fetcher" 线程。
  • 它通过心跳从 MRAppMaster 获取已完成的 Map 任务列表及其位置(主机名)。
  • Fetcher 线程并行地向这些 Map 任务所在的 NodeManager 发起 HTTP 请求,拉取属于自己分区的数据(即 Map 端生成的 file.out 文件中的一个片段)。
  • 拉取来的数据可以直接写入内存(如果足够小),或者当内存使用达到一定阈值时,溢写(spill)到本地磁盘上,形成一个溢写文件。
  • 这个过程中的内存管理和 MapTask 的 MapOutputBuffer 类似,但目的不同:MapTask 是为了排序和分区,而 Reduce 端是为了缓冲网络数据流。

Sort (Merge) 阶段:

  • 当数据拉取到一定程度(或者全部拉取完毕),ReduceTask 会启动合并过程。
  • 它使用我们之前讨论过的 Merger 工具类。
  • 内存到磁盘的合并:如果 Shuffle 过程中产生了多个磁盘溢写文件,Merger 会在后台将它们进行多路归并,减少文件数量。
  • 最终合并:当所有 Map 输出都已拉取并处理完毕后,Merger 会执行最后一轮合并。这一轮会将所有磁盘上的文件片段和内存中剩余的数据片段一起进行归并。
  • 关键点:如前所述,这最后一轮合并不会将结果完整地写入一个新文件。相反,shuffleConsumerPlugin.run() 方法返回一个 RawKeyValueIterator。这个迭代器代表了最终合并排序后的数据流。Reduce 任务可以从这个迭代器中一条一条地读取数据,实现了流式处理,极大地节省了磁盘空间。

Reduce 阶段

当 shuffleConsumerPlugin.run() 返回后,ReduceTask 就拥有了一个全局有序的 RawKeyValueIterator。接下来就是执行用户逻辑的 Reduce 阶段。

  1. 分组 (Grouping): 框架并不会一次性把某个 key 的所有 value 都读入内存。它使用一个 ValuesIterator 来包装 RawKeyValueIteratorValuesIterator 的工作方式是:

    • 调用 nextKey(),迭代器前进到下一个唯一的 key。
    • 然后,你可以反复调用 next() 来遍历这个 key 对应的所有 value。
    • 当 hasNext() 返回 false 时,表示这个 key 的所有 value 都已遍历完。
    • 这个设计使得即使一个 key 对应海量的 value,也不会导致内存溢出。
  2. 调用用户代码runOldReducer (或 runNewReducer) 方法会循环地从 RawKeyValueIterator 中读取数据,并组织成 <key, Iterable<value>> 的形式,然后调用用户实现的 Reducer 的 reduce 方法。

    // ... runOldReducer 方法内部 ...// 创建一个 ValuesIterator 来实现分组ReduceValuesIterator<INKEY,INVALUE> values = new ReduceValuesIterator<INKEY,INVALUE>(rIter, job.getOutputValueGroupingComparator(), keyClass, valueClass, job, reporter);// 主循环while (values.more()) {// reduceInputKeyCounter 记录处理了多少个 key (group)reduceInputKeyCounter.increment(1);// 调用用户的 reduce 方法,传入当前的 key 和一个可以遍历所有 value 的迭代器reducer.reduce(values.getKey(), values, finalOut, reporter);// 移动到下一个 keyvalues.nextKey();}
    // ...
    

    在 ReduceValuesIterator 内部,每次调用 next() 都会使 reduceInputValueCounter 加一,用于统计总共处理了多少条 value 记录。

  3. 输出: 在用户的 reduce 方法中,通过调用 OutputCollector.collect(key, value) (旧版 API) 或 Context.write(key, value) (新版 API) 来输出结果。这些调用最终会通过 RecordWriter 将数据写入到由 FileOutputFormat 指定的 HDFS 目标路径中。

总结

ReduceTask 是一个精心设计的、高度优化的数据处理引擎,它将复杂的分布式数据汇聚、排序和处理流程对用户透明化。其核心设计思想可以概括为:

  • 插件化:通过 ShuffleConsumerPlugin 将 Shuffle 和 Sort 过程抽象出来,使其可替换、可定制。
  • 并发拉取:通过多线程并行地从 Map 任务拉取数据,最大化网络带宽利用率。
  • 多轮归并:借鉴外部排序的思想,使用 Merger 对磁盘和内存中的数据片段进行高效的多路归并,能够处理远超内存大小的数据。
  • 流式处理:在最终的 Reduce 阶段,通过 RawKeyValueIterator 和 ValuesIterator 实现数据的流式读取和分组,避免将所有数据一次性加载到内存,具有很好的可伸缩性。
  • 清晰的阶段划分:将整个任务划分为 Copy -> Sort -> Reduce 三个清晰的阶段,便于监控、报告进度和定位问题。

理解 ReduceTask 的工作流程,特别是 Shuffle 和 Sort 阶段的实现细节,对于理解 MapReduce 的性能瓶颈以及如何进行作业调优至关重要。

ReduceTask 中的run方法详解

run 方法的逻辑非常清晰,严格按照 Reduce 任务的生命周期来执行。我们可以将其划分为以下几个主要阶段:

  1. 初始化和预备工作:设置任务状态、初始化各种计数器、启动与 MRAppMaster 通信的 TaskReporter 线程。
  2. 特殊任务处理:检查并执行作业/任务的 Setup 或 Cleanup 任务,如果匹配则执行并提前返回。
  3. Shuffle & Sort 阶段:这是最核心和复杂的部分。通过插件化的 ShuffleConsumerPlugin 来拉取(Copy)并合并(Merge/Sort)所有相关的 Map 输出。
  4. Reduce 阶段:调用用户编写的 Reducer 代码,处理排序好的数据。
  5. 收尾工作:关闭资源,并向 MRAppMaster 报告任务完成。

下面我们结合代码,逐一解析这些阶段。

// ... existing code ...@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {
// ... existing code ...

1. 初始化和预备工作

方法开始时,会进行一系列的初始化。

// ... existing code ...public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());// 1. 初始化进度条的各个阶段if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}// 2. 启动与 ApplicationMaster 通信的 Reporter 线程TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewReducer();initialize(job, getJobID(), reporter, useNewApi);
// ... existing code ...
  • getProgress().addPhase(...): 为任务进度条设置三个主要阶段:"copy", "sort", "reduce"。这使得用户可以通过 UI 实时看到任务进行到哪一步了。
  • startReporter(umbilical): 启动一个后台线程 (TaskReporter)。这个线程负责定期通过 umbilical 协议(RPC)向 MRAppMaster 发送心跳,报告任务的当前状态、进度和计数器信息。这对于任务的存活监控和进度跟踪至关重要。
  • initialize(...): 做一些通用的初始化,比如获取 OutputCommitter 等。

2. 特殊任务处理 (Setup/Cleanup)

一个 MapReduce 作业除了常规的 Map/Reduce 任务外,还可能包含作业级别和任务级别的设置(Setup)与清理(Cleanup)任务。run 方法会检查当前任务是否是这些特殊任务之一。

// ... existing code ...// 检查是否是作业清理任务if (jobCleanup) {runJobCleanupTask(umbilical, reporter);return; // 执行完直接返回}// 检查是否是作业设置任务if (jobSetup) {runJobSetupTask(umbilical, reporter);return; // 执行完直接返回}// 检查是否是任务清理任务if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return; // 执行完直接返回}
// ... existing code ...

这些检查确保了如果当前 ReduceTask 的角色是执行 JobSetup 或 TaskCleanup 等,它会执行相应的逻辑然后直接退出,不会进入后续的 Shuffle 和 Reduce 流程。

3. Shuffle & Sort 阶段

这是 run 方法的核心,也是 ReduceTask 最复杂的部分。

// ... existing code ...// 初始化编解码器codec = initCodec();RawKeyValueIterator rIter = null;ShuffleConsumerPlugin shuffleConsumerPlugin = null;// ... 初始化 Combiner 相关 ...// 1. 通过反射加载并实例化 Shuffle 插件 (默认为 o.a.h.mapreduce.task.reduce.Shuffle)Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);// 2. 创建 Shuffle 插件的上下文对象,传入所有需要的依赖ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(/* ... a lot of parameters ... */);// 3. 初始化 Shuffle 插件shuffleConsumerPlugin.init(shuffleContext);// 4. 执行 Shuffle 插件的 run 方法,这是阻塞的,直到数据拉取和合并排序完成rIter = shuffleConsumerPlugin.run();// 释放内存中的数据结构mapOutputFilesOnDisk.clear();// 5. 更新阶段状态sortPhase.complete();                         // sort 阶段完成setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);
// ... existing code ...
  • 插件化设计: Hadoop 将 Shuffle 和 Sort 的逻辑抽象为 ShuffleConsumerPlugin 接口。这提供了很好的灵活性,允许用户或发行版提供自定义的实现。默认的实现是 org.apache.hadoop.mapreduce.task.reduce.Shuffle
  • Context 对象: 框架创建了一个 Context 对象,将所有 Shuffle 过程需要的配置、依赖(如 JobConfFileSystemTaskUmbilicalProtocol 等)都封装起来,传递给插件。这是一种依赖注入的体现,降低了耦合。
  • shuffleConsumerPlugin.run(): 这是整个 Shuffle & Sort 过程的触发点。调用此方法后,插件内部会启动 Fetcher 线程去拉取数据,并在数据到达后启动 Merger 线程进行归并排序。这个方法会一直阻塞,直到所有属于该 Reduce 分区的数据都被成功拉取,并且在本地完成了最终的归并排序。
  • 返回值 RawKeyValueIteratorrun 方法的返回值是一个 RawKeyValueIterator。这是一个非常关键的设计。它不是一个包含所有数据的集合,而是一个迭代器。这意味着最终的合并排序结果是以的形式提供的,Reduce 阶段可以一条一条地从这个迭代器中读取数据,而不需要将所有数据一次性加载到内存中。
  • 状态更新: 当 shuffleConsumerPlugin.run() 返回后,说明 Copy 和 Sort 阶段都已经完成。因此代码会立刻将 sortPhase 标记为完成,并将任务的整体阶段设置为 REDUCE

4. Reduce 阶段

当数据准备就绪后,就进入了执行用户代码的 Reduce 阶段。

// ... existing code ...Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();RawComparator comparator = job.getOutputValueGroupingComparator();// 根据用户配置,选择调用新版 API (mapreduce) 还是旧版 API (mapred)if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}
// ... existing code ...

这里框架会判断用户使用的是新的 org.apache.hadoop.mapreduce API 还是旧的 org.apache.hadoop.mapred API,然后调用对应的包装方法 (runNewReducer 或 runOldReducer)。这两个方法内部逻辑类似,都是:

  1. 通过反射创建用户定义的 Reducer 实例。
  2. 创建一个 RecordWriter 用于写入最终输出。
  3. 创建一个 ValuesIterator 来包装 RawKeyValueIterator,实现按 Key 对 Value 进行分组。
  4. 在一个 while 循环中,迭代每一个唯一的 Key,并调用用户 reducer.reduce(key, values, ...) 方法。

5. 收尾工作

当 runOld/NewReducer 方法执行完毕后,run 方法进行最后的清理。

// ... existing code ...shuffleConsumerPlugin.close();done(umbilical, reporter);}
// ... existing code ...
  • shuffleConsumerPlugin.close(): 关闭 Shuffle 插件,释放其占用的资源(如线程池、文件句柄等)。
  • done(umbilical, reporter): 这是任务完成的最后一步。它会停止 TaskReporter 线程,并通过 umbilical 协议向 MRAppMaster 发送最终状态(COMMIT_PENDING),然后等待 MRAppMaster 的指令来提交(commit)或中止(abort)任务的输出。

总结

ReduceTask.run 方法是一个高度结构化、逻辑严谨的过程控制器。它完美地体现了框架与用户代码的分离:框架负责所有繁重、通用的工作(如通信、数据移动、排序、资源管理),而将特定于业务的逻辑(reduce 函数)留给用户实现。通过插件化、流式处理和清晰的阶段划分,它以一种健壮且高效的方式完成了 Reduce 端的全部工作。

Shuffle 

Shuffle 类是 ShuffleConsumerPlugin 接口的默认实现。正如其名,它是一个插件。在 ReduceTask 的 run 方法中,框架通过配置 mapreduce.job.shuffle.consumer.plugin.class 来加载并实例化这个插件。

Shuffle 的核心职责是管理 ReduceTask 的 Copy 和 Sort 两个阶段。它负责:

  1. 知道去哪里拿数据:通过与 MRAppMaster 通信,获取已完成的 Map 任务列表及其输出位置。
  2. 把数据拿回来:启动网络线程(Fetcher)从各个 Map 任务所在的节点上拉取属于当前 Reduce 任务的数据。
  3. 管理拿回来的数据:将拉取到的数据高效地存入内存,并在内存不足时溢写到磁盘。
  4. 合并与排序:在数据拉取的同时或之后,将内存和磁盘上的多个数据片段进行多路归并排序。
  5. 提供最终有序数据流:当所有数据都处理完毕后,向 ReduceTask 提供一个单一的、全局有序的 RawKeyValueIterator,供其进行 Reduce 操作。

开发者通常不直接与 Shuffle 类交互。它的生命周期完全由 ReduceTask 管理:

  1. ReduceTask 创建 Shuffle 实例。
  2. ReduceTask 调用 shuffle.init(context),传入所有必要的依赖和配置。
  3. ReduceTask 调用 shuffle.run(),这个调用会阻塞,直到 Shuffle 完成了所有数据的拉取和合并排序。
  4. shuffle.run() 返回一个 RawKeyValueIteratorReduceTask 用它来进行后续的 Reduce 操作。
  5. ReduceTask 调用 shuffle.close() 释放资源。

Shuffle 的核心组件

Shuffle 类本身是一个协调者(Orchestrator),它将复杂的工作委托给内部的几个关键组件来完成。在 init 方法中,我们可以看到这些组件的创建:

// ... existing code ...@Overridepublic void init(ShuffleConsumerPlugin.Context context) {// ... 初始化成员变量 ...// 1. 创建调度器 (Scheduler)scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,this, copyPhase, context.getShuffledMapsCounter(),context.getReduceShuffleBytes(), context.getFailedShuffleCounter());// 2. 创建合并管理器 (Merger)merger = createMergeManager(context);}protected MergeManager<K, V> createMergeManager(ShuffleConsumerPlugin.Context context) {return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),// ... 传入各种依赖 ...);}
// ... existing code ...
  • ShuffleSchedulerImpl (调度器):

    • 职责:决定何时何地拉取哪个 Map 输出。
    • 它维护了一个待拉取 Map 输出的列表(pendingHosts)和已拉取但失败的列表,并进行重试管理。
    • 它为 Fetcher 线程提供要拉取的任务(MapHost 对象)。
    • 它跟踪整个 Shuffle 过程的进度,判断是否所有需要的 Map 输出都已成功拉取。
  • MergeManagerImpl (合并管理器):

    • 职责:管理所有拉取下来的数据,并负责将它们合并排序。
    • 它内部维护了内存缓冲区(inMemoryMapOutputs)和磁盘文件列表(onDiskMapOutputs)。
    • 当 Fetcher 线程拉取数据后,会调用 merger.reserve(...) 来获取一块内存或磁盘空间来存放数据。
    • 它会根据内存使用率(mapreduce.reduce.shuffle.merge.percent)和文件数量(mapreduce.task.io.sort.factor)来决定何时触发中间合并(in-memory merge 或 on-disk merge)。
    • 在 Shuffle 的最后阶段,它负责执行最终合并(final merge),将所有内存和磁盘上的片段合并成一个最终的有序流。
  • EventFetcher (事件获取器):

    • 职责:一个独立的线程,定期通过 RPC (umbilical.getMapCompletionEvents(...)) 从 MRAppMaster 获取 Map 任务完成事件(TaskCompletionEvent)。
    • 获取到事件后,它将事件交给 scheduler 来解析和调度。
  • Fetcher (数据拉取器):

    • 职责:真正执行数据拉取工作的线程。默认会启动多个(由 mapreduce.shuffle.parallel.copies 控制,默认是5)。
    • 它不断地向 scheduler 请求任务(scheduler.getHost())。
    • 拿到任务后,它会构造一个 HTTP URL,向目标 Map 任务所在的 NodeManager 上的 ShuffleHandler 服务发起请求。
    • 获取到数据流后,它将数据交给 merger 处理。

run() 方法:Shuffle 的执行流程

run() 方法是 Shuffle 的主逻辑,它编排了上述所有组件的启动、协作和关闭。

// ... existing code ...@Overridepublic RawKeyValueIterator run() throws IOException, InterruptedException {// ... 计算每次RPC获取的事件数量 ...// 1. 启动 EventFetcher 线程,开始从 AppMaster 获取 Map 完成事件final EventFetcher<K, V> eventFetcher =new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,maxEventsToFetch);eventFetcher.start();// 2. 启动 Fetcher 线程,开始拉取数据boolean isLocal = localMapFiles != null; // 判断是否是本地运行模式final int numFetchers = isLocal ? 1 :jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];if (isLocal) {// ... 启动 LocalFetcher ...} else {for (int i=0; i < numFetchers; ++i) {fetchers[i] = new Fetcher<K, V>(/* ... */);fetchers[i].start();}}// 3. 主线程等待,直到 Shuffle 完成while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {reporter.progress(); // 定期报告进度synchronized (this) { // 检查是否有子线程抛出异常if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}}// 4. Shuffle 完成,开始关闭和清理// 停止 EventFetchereventFetcher.shutDown();// 停止所有 Fetcherfor (Fetcher<K, V> fetcher : fetchers) {fetcher.shutDown();}// 关闭调度器scheduler.close();// 更新任务状态为 SORTcopyPhase.complete();taskStatus.setPhase(TaskStatus.Phase.SORT);reduceTask.statusUpdate(umbilical);// 5. 执行最终合并,并获取迭代器RawKeyValueIterator kvIter = null;try {kvIter = merger.close();} catch (Throwable e) {throw new ShuffleError("Error while doing final merge ", e);}// ... 最终异常检查 ...// 6. 返回最终的有序数据流迭代器return kvIter;}
// ... existing code ...

流程分解

  1. 启动run 方法首先启动 EventFetcher 和多个 Fetcher 线程。这些线程在后台并行工作。
  2. 等待:主线程进入一个 while 循环,调用 scheduler.waitUntilDone()。这个方法会阻塞,直到 scheduler 判断所有需要的 Map 输出都已成功获取。循环中会定期报告进度,并检查是否有子线程通过 reportException 方法报告了异常。
  3. 关闭:一旦 waitUntilDone() 返回 true,说明 Copy 阶段已完成。主线程会依次关闭 EventFetcherFetcher 和 Scheduler
  4. 最终合并:主线程调用 merger.close()。这个方法会触发 MergeManager 执行最后一轮合并,将所有内存和磁盘上的数据片段归并成一个单一的、有序的数据流。
  5. 返回迭代器merger.close() 返回一个 RawKeyValueIteratorrun 方法将其作为最终结果返回给 ReduceTask

异常处理

Shuffle 涉及多个后台线程,其异常处理机制值得注意。

  • Shuffle 类实现了 ExceptionReporter 接口。
  • 任何后台线程(如 FetcherMerger 的合并线程)如果遇到严重错误,都会调用 shuffle.reportException(t)
  • reportException 方法是一个 synchronized 方法,它会记录第一个发生的异常,并 notifyAll() 正在等待 scheduler 对象的线程(主要是主线程)。
  • 主线程在 while 循环中会检查 throwable 变量,一旦发现非空,就会立即抛出 ShuffleError,从而使整个任务失败。

这种设计确保了任何一个后台组件的失败都能被主流程及时捕获并导致任务快速失败,避免资源浪费。

总结

Shuffle 类是 MapReduce 框架中一个设计精良、高度并发的组件。它通过将职责分解到 SchedulerMergerEventFetcher 和 Fetcher 等多个协作组件中,清晰地实现了复杂的 Shuffle 逻辑。其核心思想可以概括为:

  • 生产者-消费者模型EventFetcher 是 Map 完成事件的生产者,Scheduler 是消费者;Scheduler 是待拉取任务的生产者,Fetcher 是消费者;Fetcher 是数据的生产者,Merger 是消费者。
  • 并发与并行:通过多个 Fetcher 线程并行拉取数据,最大化网络吞吐量。同时,数据的拉取(Copy)和合并(Sort)过程也可以在一定程度上并行。
  • 流式处理与外部排序:通过内存/磁盘的溢写和多路归并,能够处理远超内存容量的数据。最终返回一个迭代器而非一个巨大的集合,实现了高效的流式处理。
  • 健壮的异常处理:集中的异常报告机制确保了分布式并发环境下的稳定性和快速失败能力。

ShuffleSchedulerImpl 

ShuffleSchedulerImpl 是 ShuffleScheduler 接口的默认实现。它并不直接被用户或 ReduceTask 调用,而是作为 Shuffle 类的内部核心组件被创建和使用。

ShuffleSchedulerImpl 的核心职责是调度 Fetcher 线程去拉取 Map 输出。具体来说,它负责:

  1. 接收任务:从 EventFetcher 线程接收 TaskCompletionEvent(Map 任务完成事件),并解析出 Map 输出的位置信息。
  2. 维护状态:维护所有 Map 输出的状态,包括待拉取(pending)、已完成(finished)、失败(failed)、过时(obsolete)等。
  3. 提供任务:向 Fetcher 线程提供下一个最适合拉取的 MapHost(目标主机和 Map 任务列表)。
  4. 处理反馈:接收 Fetcher 线程的执行结果,无论是成功(copySucceeded)还是失败(copyFailed)。
  5. 失败与惩罚机制:当拉取失败时,记录失败次数,并对频繁失败的主机(Host)进行“惩罚”(penalize),即在一段时间内不再从该主机拉取数据,避免在故障节点上浪费过多尝试。
  6. 健康监控:持续监控 Shuffle 过程的健康状况(checkReducerHealth),如果失败率过高或长时间没有进展,会主动让整个 Reduce 任务失败,防止任务僵死。
  7. 进度判断:判断整个 Copy 阶段是否完成(waitUntilDone)。

它的生命周期完全由 Shuffle 类控制:

  1. Shuffle 在其 init 方法中创建 ShuffleSchedulerImpl 实例。
  2. EventFetcher 线程获取到 Map 完成事件后,调用 scheduler.resolve(event)
  3. Fetcher 线程通过调用 scheduler.getHost() 来获取下一个要拉取的任务。
  4. Fetcher 线程完成拉取后,调用 scheduler.copySucceeded(...) 或 scheduler.copyFailed(...) 来报告结果。
  5. Shuffle 的主线程通过循环调用 scheduler.waitUntilDone(...) 来等待 Copy 阶段完成。
  6. 最后,Shuffle 调用 scheduler.close() 来停止其内部线程(如 Referee)。

核心数据结构与状态管理

ShuffleSchedulerImpl 内部通过一系列精心设计的数据结构来管理复杂的状态。

// ... existing code ...
public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
// ...private final boolean[] finishedMaps; // 标记每个 Map 是否已成功拉取private final int totalMaps; // 总 Map 数量private int remainingMaps; // 剩余待拉取的 Map 数量// 核心数据结构:维护主机和 Map 输出的关系private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();private Set<MapHost> pendingHosts = new HashSet<MapHost>(); // 待拉取的主机集合private Set<TaskAttemptID> obsoleteMaps = new HashSet<TaskAttemptID>(); // 已过时的 Map 尝试private final Random random = new Random();// 惩罚队列,用于实现失败主机的延迟重试private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();private final Referee referee = new Referee(); // 一个用于处理惩罚队列的线程// 记录失败次数private final Map<TaskAttemptID,IntWritable> failureCounts =new HashMap<TaskAttemptID,IntWritable>();private final Map<String,IntWritable> hostFailures =new HashMap<String,IntWritable>();
// ...
}
  • finishedMaps: 一个布尔数组,大小为总 Map 数。finishedMaps[i] = true 表示第 i 个 Map 的输出已经被成功拉取。这是判断整个 Shuffle 是否完成的最终依据。
  • mapLocations: 一个 Map,键是主机名(hostname:port),值是 MapHost 对象。MapHost 对象内部维护了一个该主机上所有待拉取的 Map 任务列表。
  • pendingHosts: 一个 Set,包含了所有当前有待拉取 Map 输出的主机。Fetcher 从这个集合中选择主机。
  • penalties: 一个 DelayQueue,这是实现惩罚机制的关键。当一个主机拉取失败后,会被包装成一个 Penalty 对象放入此队列,并设置一个延迟时间。只有当延迟时间过后,这个主机才能被重新考虑用于拉取。
  • referee: 一个内部线程,它的唯一工作就是不断地从 penalties 队列中取出到期的 Penalty 对象,并将对应的主机重新放回 pendingHosts 集合,使其可以被再次调度。

核心实现:调度与失败处理

接收和解析任务 (resolve)

当 EventFetcher 从 MRAppMaster 获取到 TaskCompletionEvent 时,会调用 resolve 方法。

// ... existing code ...@Overridepublic void resolve(TaskCompletionEvent event) {switch (event.getTaskStatus()) {case SUCCEEDED:// 如果 Map 成功,解析出 URL,并添加到待处理列表URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());addKnownMapOutput(u.getHost() + ":" + u.getPort(),u.toString(),event.getTaskAttemptId());// ...break;case FAILED:case KILLED:case OBSOLETE:// 如果 Map 失败、被杀或已过时,将其标记为 obsoleteobsoleteMapOutput(event.getTaskAttemptId());// ...break;case TIPFAILED:// 如果整个 Map 任务(TIP)都失败了,标记该 Map ID 为完成,不再需要拉取tipFailed(event.getTaskAttemptId().getTaskID());// ...break;}}
// ... existing code ...

这个方法根据事件类型对 Map 输出进行分类处理,成功的放入待拉取池,失败或过时的则加入 obsoleteMaps 集合,避免 Fetcher 去拉取无效的数据。

提供任务给 Fetcher (getHost)

Fetcher 线程通过 getHost 方法来获取下一个任务。

// ... existing code ...@Overridepublic synchronized MapHost getHost() throws InterruptedException {while (pendingHosts.isEmpty()) {// 如果没有待处理的主机,则等待wait();}// 从待处理主机中随机选择一个MapHost host = null;Iterator<MapHost> iter = pendingHosts.iterator();int numToPick = random.nextInt(pendingHosts.size());for (int i=0; i <= numToPick; ++i) {host = iter.next();}// 从 pendingHosts 中移除,表示该主机正在被一个 Fetcher 处理pendingHosts.remove(host);// 设置主机的状态为 PENDINGhost.setState(State.PENDING);return host;}
// ... existing code ...

这里的实现很简单:在 pendingHosts 不为空时,随机选择一个 MapHost 返回。随机选择有助于将拉取负载均匀地分布到不同的 Map 节点上。

处理拉取成功 (copySucceeded)

当 Fetcher 成功拉取并提交数据后,调用此方法。

// ... existing code ...public synchronized void copySucceeded(TaskAttemptID mapId,MapHost host,long bytes,long startMillis,long endMillis,MapOutput<K,V> output) throws IOException {// 清除该 Map 和 Host 的失败记录failureCounts.remove(mapId);hostFailures.remove(host.getHostName());int mapIndex = mapId.getTaskID().getId();if (!finishedMaps[mapIndex]) {output.commit(); // 真正提交数据finishedMaps[mapIndex] = true; // 标记为完成shuffledMapsCounter.increment(1);if (--remainingMaps == 0) {notifyAll(); // 如果全部完成,唤醒等待在 waitUntilDone 上的主线程}// ... 更新各种计数器和进度 ...} else {// 如果这个 Map 已经被其他 Fetcher 完成了,则丢弃本次结果LOG.warn("Aborting already-finished MapOutput for " + mapId);output.abort();}}
// ... existing code ...

此方法是线程安全的 (synchronized)。它会更新 finishedMaps 数组,递减 remainingMaps 计数,并更新进度。如果 remainingMaps 减到 0,则通过 notifyAll() 唤醒 Shuffle 的主线程。

这里的 output 是一个 MapOutput 对象,它代表了刚刚拉取到的数据。output.commit() 是一个关键调用。这个调用的作用就是将这份数据“固化”下来,使其对 MergeManager 可见。

  • 如果数据在内存中,commit() 会将这个内存块正式加入到 MergeManager 的 inMemoryMapOutputs 列表中,准备参与后续的内存到内存(in-memory)或内存到磁盘(on-disk)的合并。
  • 如果数据在磁盘上,commit() 会将这个磁盘文件正式加入到 MergeManager 的 onDiskMapOutputs 列表中,准备参与后续的磁盘到磁盘的合并。

所以,copySucceeded 执行后,输出结果(即拉取到的 Map 数据)被正式纳入了 MergeManager 的管理体系,存放在内存或本地磁盘中,等待最终的归并排序。

处理拉取失败 (copyFailed 和 penalize)

这是 ShuffleSchedulerImpl 最复杂的逻辑所在,体现了其鲁棒性。

// ... existing code ...public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,boolean readError, boolean connectExcpt) {// 增加对应 Map 和 Host 的失败次数// ...// 检查是否需要向 MRAppMaster 报告失败checkAndInformMRAppMaster(failures, mapId, readError, connectExcpt,hostFail);// 检查 Reducer 自身的健康状况checkReducerHealth();// 计算惩罚时间,失败次数越多,惩罚时间越长(指数增长)long delay = (long) (INITIAL_PENALTY *Math.pow(PENALTY_GROWTH_RATE, failures));// 对该主机进行惩罚penalize(host, Math.min(delay, maxPenalty));failedShuffleCounter.increment(1);}void penalize(MapHost host, long delay) {host.penalize(); // 设置主机状态为 PENALIZED// 将主机放入 DelayQueue,延迟 delay 毫秒后才能被 Referee 线程取出penalties.add(new Penalty(host, delay));}
// ... existing code ...

当拉取失败时:

  1. 增加失败计数。
  2. 调用 checkAndInformMRAppMaster,在特定条件下(如连接异常、读错误、或失败次数达到阈值)会通过 TaskStatus 将失败信息报告给 MRAppMaster。这可以让 MRAppMaster 知道某个 Map 输出可能存在问题。
  3. 调用 checkReducerHealth,这是一个健康自检机制。如果唯一失败的 Map 数量过多,或者任务进度停滞太久,它会主动调用 reporter.reportException 让整个 Reduce 任务失败,避免僵死。
  4. 调用 penalize,将失败的 MapHost 放入 penalties 这个 DelayQueue 中。Fetcher 将在一段时间内不会再从这个主机拉取数据。惩罚时间是指数增长的,但有上限。

失败次数限制

ShuffleSchedulerImpl 设计了一套相当完善的失败容忍和快速失败机制。从 ShuffleSchedulerImpl.java 的代码中,我们可以看到以下几个关键的限制:

  1. 单个 Map 拉取失败次数限制 (abortFailureLimit):

    // ... existing code ...abortFailureLimit = Math.max(30, totalMaps / 10);
    // ... existing code ...public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,boolean readError, boolean connectExcpt) {
    // ... existing code ...if (failures >= abortFailureLimit) {try {throw new IOException(failures + " failures downloading " + mapId);} catch (IOException ie) {reporter.reportException(ie);}}
    // ... existing code ...
    

    这个限制针对同一个 Map Attempt。如果从某个特定的 Map 任务拉取数据连续失败的次数超过了 abortFailureLimit(默认为 30 和总 Map 数的 1/10 中的较大值),调度器会认为这个 Map 输出存在严重问题,无法获取,于是会通过 reporter.reportException 抛出异常,导致整个 Reduce 任务失败。

  2. 单个主机(Host)失败次数限制 (maxHostFailures):

    // ... existing code ...this.maxHostFailures = job.getInt(MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
    // ... existing code ...public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,boolean readError, boolean connectExcpt) {
    // ... existing code ...//report failure if already retried maxHostFailures timesboolean hostFail = hostFailures.get(hostname).get() >getMaxHostFailures() ? true : false;checkAndInformMRAppMaster(failures, mapId, readError, connectExcpt,hostFail);
    // ... existing code ...
    

    这个限制由配置项 mapreduce.shuffle.fetch.host.failures.max 控制。当从同一个主机拉取数据(无论哪个 Map)累计失败的次数超过这个阈值时,hostFail 标志会变为 true。这会触发 checkAndInformMRAppMaster 机制,将这个主机的问题报告给 MRAppMaster。这通常意味着该节点可能存在网络或服务问题。

  3. 不同 Map 的失败总数限制 (maxFailedUniqueFetches):

    // ... existing code ...this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
    // ... existing code ...private void checkReducerHealth() {
    // ... existing code ...if ((failureCounts.size() >= maxFailedUniqueFetches ||failureCounts.size() == (totalMaps - doneMaps))&& !reducerHealthy&& (!reducerProgressedEnough || reducerStalled)) {LOG.error("Shuffle failed with too many fetch failures " +"and insufficient progress!");String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";reporter.reportException(new IOException(errorMsg));}}
    // ... existing code ...
    

    这个限制更加严格。failureCounts 记录了当前有哪些不同的 Map 任务正在经历失败。如果这个集合的大小达到了 maxFailedUniqueFetches(默认为 5 和总 Map 数中的较小值),并且满足其他健康检查条件(如进度停滞、失败率过高等),调度器就会认为 Shuffle 过程遇到了系统性问题,并主动让任务失败。这可以防止任务在多个 Map 输出都无法获取的情况下无限期地重试和等待。

综上所述,Hadoop 在 Shuffle 阶段设计了多层次、多维度的失败限制,以确保在面对各种故障时,任务既能有一定的容错能力,又能在问题严重时快速失败,保证了整个系统的健壮性。

总结

ShuffleSchedulerImpl 是一个高度复杂和健壮的调度器。它不仅仅是简单地分发任务,而是构建了一个完整的闭环系统,集任务分发、状态跟踪、失败重试、惩罚机制和健康监控于一体。

  • 核心机制:通过 pendingHostspenalties 队列和 referee 线程,实现了一套高效的、带退避策略(back-off)的调度循环。
  • 鲁棒性:通过对 Map 尝试、主机、整体进度的多维度失败计数和健康检查,能够容忍网络抖动、节点短暂失效等常见问题,同时也能在问题严重时快速失败,避免浪费集群资源。
  • 信息反馈:它不仅调度任务,还负责收集进度和失败信息,并通过 TaskStatus 和 Counters 将这些信息反馈给 MRAppMaster 和用户,提供了很好的可观测性。

理解 ShuffleSchedulerImpl 的工作原理,对于诊断和调优 MapReduce 作业中常见的 Shuffle 性能问题至关重要。

http://www.xdnf.cn/news/1364077.html

相关文章:

  • 【C++高并发内存池篇】ThreadCache 极速引擎:C++ 高并发内存池的纳秒级无锁革命!
  • 【目标跟踪】《FastTracker: Real-Time and Accurate Visual Tracking》论文阅读笔记
  • 论文阅读:Code as Policies: Language Model Programs for Embodied Control
  • uniapp中加载.urdf后缀的3D模型(three.js+urdf-loader)
  • 最新刀客IP地址信息查询系统源码_含API接口_首发
  • CAN总线详解(四)CANFD报文结构
  • 引脚电平异常?以下或许是原因
  • 十九、云原生分布式存储 CubeFS
  • dubbo源码之优雅关闭
  • 基于PyTorch深度学习遥感影像地物分类与目标检测、分割及遥感影像问题深度学习优化
  • 使用Docker配置Redis Stack集群的步骤
  • Redis常规指令及跳表
  • 电子之路(一)酒店门锁主板-主板接线图和原理-东方仙盟
  • 8.25学习日志
  • Portswigger靶场之Blind SQL injection with conditional errorsPRACTITIONERLAB
  • 36 NoSQL 注入
  • 大模型微调 Prompt Tuning与P-Tuning 的区别?
  • Java多态大冒险:当动物们开始“造反”
  • leetcode-hot-100 (二分查找)
  • 实用电脑小工具分享,守护电脑隐私与提升效率21/64
  • LengthFieldBasedFrameDecoder 详细用法
  • excel 破解工作表密码
  • 无锁队列的设计与实现
  • 记一次 element-plus el-table-v2 表格滚动卡顿问题优化
  • 【学习记录】CSS: clamp、@scope
  • 一键编译安装zabbix(centos)
  • Go编写的轻量文件监控器. 可以监控终端上指定文件夹内的变化, 阻止删除,修改,新增操作. 可以用于AWD比赛或者终端应急响应
  • go-redis库使用总结
  • 跨语言统一语义真理及其对NLP深层分析影响
  • 人体工学优化:握力环直径 / 重量设计与便携性、握持舒适度的协同分析