Yarn-Tool接口定义
1. Tool 接口概述
Tool
接口位于org.apache.hadoop.util
包中,是一个用于构建 Hadoop 命令行工具的标准接口。它继承自Configurable
接口,允许工具获取和设置 Hadoop 配置:
java
public interface Tool extends Configurable {int run(String[] args) throws Exception;
}
- 核心方法:
run(String[] args)
:工具的入口点,接收命令行参数并返回执行状态码(通常 0 表示成功,非 0 表示失败)。
2. ToolRunner 类:简化 Tool 的执行
ToolRunner
是一个辅助类,用于简化Tool
接口的实现和执行。它提供了静态方法run()
来处理配置初始化、参数解析和工具执行:
java
public class ToolRunner {public static int run(Configuration conf, Tool tool, String[] args) throws Exception;public static int run(Tool tool, String[] args) throws Exception;
}
典型用法:
java
public static void main(String[] args) {int exitCode = ToolRunner.run(new Configuration(), new MyTool(), args);System.exit(exitCode);
}
3. 实现 Tool 接口的步骤
步骤 1:实现 Tool 接口
创建一个类实现Tool
接口,并实现run()
方法:
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class MyTool implements Tool {private Configuration conf;@Overridepublic void setConf(Configuration conf) {this.conf = conf;}@Overridepublic Configuration getConf() {return conf;}@Overridepublic int run(String[] args) throws Exception {// 解析命令行参数if (args.length < 2) {System.err.println("Usage: MyTool <input-path> <output-path>");return -1;}String inputPath = args[0];String outputPath = args[1];// 使用配置和参数执行具体业务逻辑// ...return 0; // 返回执行状态码}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new Configuration(), new MyTool(), args);System.exit(exitCode);}
}
步骤 2:处理命令行参数
通常使用GenericOptionsParser
或 Apache Commons CLI 解析参数:
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;public class MyTool extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {// 解析标准Hadoop参数(如-hdfs,-libjars等)Configuration conf = getConf();GenericOptionsParser parser = new GenericOptionsParser(conf, args);String[] remainingArgs = parser.getRemainingArgs();// 处理自定义参数if (remainingArgs.length < 2) {System.err.println("Usage: MyTool <input> <output>");return -1;}String input = remainingArgs[0];String output = remainingArgs[1];// 执行作业逻辑// ...return 0;}
}
4. 与 YARN 交互的典型场景
场景 1:提交 MapReduce 作业到 YARN
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class WordCountTool implements Tool {private Configuration conf;@Overridepublic void setConf(Configuration conf) {this.conf = conf;}@Overridepublic Configuration getConf() {return conf;}@Overridepublic int run(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: WordCountTool <input-path> <output-path>");return -1;}Job job = Job.getInstance(conf, "WordCount");job.setJarByClass(WordCountTool.class);// 设置Mapper和Reducer类job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 设置输出键值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置输入输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 提交作业并等待完成return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new Configuration(), new WordCountTool(), args);System.exit(exitCode);}
}
场景 2:直接与 YARN ResourceManager 交互
使用YarnClient
API 获取集群信息或提交应用:
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class YarnStatusTool implements Tool {private Configuration conf;@Overridepublic void setConf(Configuration conf) {this.conf = conf;}@Overridepublic Configuration getConf() {return conf;}@Overridepublic int run(String[] args) throws Exception {YarnConfiguration yarnConf = new YarnConfiguration(conf);YarnClient yarnClient = YarnClient.createYarnClient(yarnConf);yarnClient.init(yarnConf);yarnClient.start();try {// 获取所有应用List<ApplicationReport> apps = yarnClient.getApplications();for (ApplicationReport app : apps) {System.out.println("Application: " + app.getApplicationId() + ", Name: " + app.getName() + ", State: " + app.getYarnApplicationState());}} catch (YarnException e) {e.printStackTrace();return 1;} finally {yarnClient.stop();}return 0;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new Configuration(), new YarnStatusTool(), args);System.exit(exitCode);}
}