InputFormatOutputFormat
本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputFormat的灵活应用。
InputFormat和OutputFormat简介
InputFormat规定了MR框架,如何解析输入文件,比如说TextOutputFormat内部实现了一个LineRecordReader,从LineRecordReader#nextKeyValue方法可以看出,它的key是将要读的这一行数据的起始字节位置,value是这一行的内容。
OutputFormat规定了MR框架,最后输出的文件的内容格式,比如说SequenceFileOutputFormat,其getRecordWriter方法返回一个RecordWriter的内部类,具体的操作在SequenceFile类里write相关的方法中。SequenceFile的具体实现只从类来看比价复杂,其一条记录可以简单理解成如下的格式[record length][key length][key][value]。
RecordReader和RecordWriter简介
上边提到了RecordReader和RecordWriter,这里简单介绍一下,RecordWriter和RecordReader在MR框架每一个输入输出的地方用到,读写操作调用的都是RecordReader和RecordWriter提供的接口,比如说我们在Mapper#map和Reducer#reduce的结束通常都会写这样一行代码context.write(key,value),这行代码实际调用的就是RecordWriter#write方法,RecordReader是MR框架调用的。
一个InputFormat对应一个RecordReader,一个OutputFormat对应一个RecordWriter。这样在后边的DIY的时候,也知道应该写些什么了。
FileInputFormat和FileOutputFormat
这里以FileInputFormat,FileOutputFormat和其对应的子类TextInputFormat和TextOutputFormat为例,分析一套InputFormat和OutputFormat的具体实现。
1. InputFormat
InputFormat抽象类中有两个方法:
InputFormat#getSplits(JobContext context)
对输入文件进行逻辑上的分片
InputFormat#createRecordReader(InputSplitsplit,TaskAttemptContext context)
返回一个RecordReader对象,该对象可将输入的InputSplit解析成若干个key/value对。
2. FileInputFormat
所有需要处理文件的InputFormat的基类,从其实现可以看出,FileInputFormat主要是实现InputFormat#getSplit(JobContext context)方法,由于每个子类对应不同的输入格式,所以解析InputSplit的方法InputFormat#createRecordReader(InputSplit split,TaskAttemptContextcontext)由各个子类自己实现。
这里我们先分析FileInputFormat#getSplit(JobContext context)方法。
/*** Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throwsIOException {long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generatesplitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);//获取输入的所有文件for (FileStatus file: files) {//一次循环处理一个文件Path path = file.getPath();long length = file.getLen();if (length != 0) {//文件不是空的FileSystem fs =path.getFileSystem(job.getConfiguration());BlockLocation[] blkLocations =fs.getFileBlockLocations(file, 0, length);//获取一个文件的所有的Block的带位置的信息if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize =computeSplitSize(blockSize, minSize, maxSize);//计算一个FileSplit的大小,计算过程如下,Math.max(minSize,Math.min(maxSize, blockSize))long bytesRemaining =length;//这个while循环就是根据上边的准备的信息,不停的读文件,产生FileSplit,一直到文件末尾。while (((double)bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path,length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts()));bytesRemaining -= splitSize;}//处理最后不足splitSize大小的数据if (bytesRemaining != 0) {int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path,length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts()));}} else { // not splitable不能分割的话,文件只有一块blkLocations[0]就是这一块splits.add(makeSplit(path, 0, length,blkLocations[0].getHosts()));}} else {//Create empty hosts array for zerolength filessplits.add(makeSplit(path, 0, length,new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());LOG.debug("Total # of splits: " + splits.size());return splits;}
3. TextInputFormat
TextInputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。
TextInputFormat#createRecordReader方法逻辑比较简单,最后返回一个LineRecordReader对象,下面主要看这个方法LineRecordReader#nextKeyValue。
public boolean nextKeyValue() throws IOException {if (key == null) {key = newLongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// We always read oneextra line, which lies outside the upper// split limit i.e. (end -1)
while (getFilePosition() <= end) {
//这一句就是读数据的地方了newSize =in.readLine(value, maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize <maxLineLength) {break;}// line too long. tryagainLOG.info("Skippedline of size " + newSize + " at pos " +(pos -newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}
4. OutputFormat
OutputFormat# getRecordWriter (TaskAttemptContext context)
返回一个RecordWriter对象
OutputFormat# checkOutputSpecs (JobContext context)
判断输出文件存不存在,写过MR程序的人应该很熟悉了,如果输出路径已经存在话,会抛出一个Output directory " + outDir + " already exists"错误,就是这个方法抛出的。
OutputFormat# getOutputCommitter (TaskAttemptContext context)
MR运行过程中,会产生很多中间文件,比如Mapper的输出,推测式执行Task时产生的文件等等,这个方法负责在任务执行完成后,处理这些中间文件。顺便说下,OutputCommitter对象里的方法都是回调方法,MR自动调用。
5. FileOutputFormat
FileOutputFormat对上边提到的3个方法中的后两个提供了通用的实现,OutputFormat# getRecordWriter (TaskAttemptContext context)方法需要子类自己实现。
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException{// Ensure that the outputdirectory is set and not already therePath outDir =getOutputPath(job);if (outDir == null) {throw newInvalidJobConfException("Output directory not set.");}// get delegation tokenfor outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir },job.getConfiguration());if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {throw newFileAlreadyExistsException("Output directory " + outDir +" already exists");}}public synchronizedOutputCommittergetOutputCommitter(TaskAttemptContext context)throws IOException {if (committer == null) {Path output =getOutputPath(context);committer = newFileOutputCommitter(output, context);}return committer;}
6. TextOutputFormat
TextOutputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。
TextOutputFormat#createRecordWriter方法逻辑比较简单,最后返回一个LineRecordWriter对象,下面主要看这个方法LineRecordReader#write(K key,V Value)。
public synchronized void write(K key, V value)throws IOException {boolean nullKey = key ==null || key instanceof NullWritable;boolean nullValue =value == null || value instanceof NullWritable;if (nullKey &&nullValue) {return;}//这个地方是最主要的逻辑if (!nullKey) {writeObject(key);//写出key}if (!(nullKey ||nullValue)) {out.write(keyValueSeparator);//写出key/value的分隔符}if (!nullValue) {writeObject(value);//写出value}out.write(newline);//写出行结束符
}
SqoopInputFormat和SqoopOutputFormat
下面的分析基于Sqoop1.99.3
在Sqoop中,InputFormat和OutputFormat的子类有3个,分别是:
public class SqoopInputFormat extendsInputFormat<SqoopSplit, NullWritable>
public class SqoopFileOutputFormat extendsFileOutputFormat<Data, NullWritable>
public class SqoopNullOutputFormat extendsOutputFormat<Data, NullWritable>
下面一个个分析:
1. SqoopInputFormat
从泛型传入的类型可以看出,SqoopInputFormat的key是SqoopSplit,value是NullWritable。直接把整个InputSplit作为key,传到Mapper中,SqoopInputFormat不对InputSplit做任何的解析操作。
2. SqoopFileOutputFormat
从泛型传入的类型来看,跟SqoopInputFormat类似,只是用KEY部分保存信息。SqoopFileOutputFormat只重写了父类FileOutputFormat的getRecordWriter方法和getOutputCommitter方法,checkOutputSpec方法使用的父类的。
getRecordWriter方法调用SqoopOutputFormatLoadExecutor#getRecordWriter,这个方法在返回一个SqoopRecordWriter的同时,开启一个消费者线程,SqoopRecordWriter是生产者线程。[后面接着分析Sqoop源码时细说]
3. SqoopNullOutputFormat
SqoopNullOutputFormat将OutputFormat的3个方法都重写了。SqoopNullOutputFormat#getRecordWriter方法同样是调用SqoopOutputFormatLoadExecutor#getRecordWriter。