当前位置: 首页 > news >正文

InputFormatOutputFormat

本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputFormat的灵活应用。

InputFormat和OutputFormat简介

InputFormat规定了MR框架,如何解析输入文件,比如说TextOutputFormat内部实现了一个LineRecordReader,从LineRecordReader#nextKeyValue方法可以看出,它的key是将要读的这一行数据的起始字节位置,value是这一行的内容。

OutputFormat规定了MR框架,最后输出的文件的内容格式,比如说SequenceFileOutputFormat,其getRecordWriter方法返回一个RecordWriter的内部类,具体的操作在SequenceFile类里write相关的方法中。SequenceFile的具体实现只从类来看比价复杂,其一条记录可以简单理解成如下的格式[record length][key length][key][value]。

RecordReader和RecordWriter简介

上边提到了RecordReader和RecordWriter,这里简单介绍一下,RecordWriter和RecordReader在MR框架每一个输入输出的地方用到,读写操作调用的都是RecordReader和RecordWriter提供的接口,比如说我们在Mapper#map和Reducer#reduce的结束通常都会写这样一行代码context.write(key,value),这行代码实际调用的就是RecordWriter#write方法,RecordReader是MR框架调用的。

一个InputFormat对应一个RecordReader,一个OutputFormat对应一个RecordWriter。这样在后边的DIY的时候,也知道应该写些什么了。

FileInputFormat和FileOutputFormat

这里以FileInputFormat,FileOutputFormat和其对应的子类TextInputFormat和TextOutputFormat为例,分析一套InputFormat和OutputFormat的具体实现。

1.      InputFormat

InputFormat抽象类中有两个方法:

InputFormat#getSplits(JobContext context)

对输入文件进行逻辑上的分片

InputFormat#createRecordReader(InputSplitsplit,TaskAttemptContext context)

返回一个RecordReader对象,该对象可将输入的InputSplit解析成若干个key/value对。

2.      FileInputFormat

所有需要处理文件的InputFormat的基类,从其实现可以看出,FileInputFormat主要是实现InputFormat#getSplit(JobContext context)方法,由于每个子类对应不同的输入格式,所以解析InputSplit的方法InputFormat#createRecordReader(InputSplit split,TaskAttemptContextcontext)由各个子类自己实现。

这里我们先分析FileInputFormat#getSplit(JobContext context)方法。

/*** Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throwsIOException {long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generatesplitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);//获取输入的所有文件for (FileStatus file: files) {//一次循环处理一个文件Path path = file.getPath();long length = file.getLen();if (length != 0) {//文件不是空的FileSystem fs =path.getFileSystem(job.getConfiguration());BlockLocation[] blkLocations =fs.getFileBlockLocations(file, 0, length);//获取一个文件的所有的Block的带位置的信息if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize =computeSplitSize(blockSize, minSize, maxSize);//计算一个FileSplit的大小,计算过程如下,Math.max(minSize,Math.min(maxSize, blockSize))long bytesRemaining =length;//这个while循环就是根据上边的准备的信息,不停的读文件,产生FileSplit,一直到文件末尾。while (((double)bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path,length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts()));bytesRemaining -= splitSize;}//处理最后不足splitSize大小的数据if (bytesRemaining != 0) {int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path,length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts()));}} else { // not splitable不能分割的话,文件只有一块blkLocations[0]就是这一块splits.add(makeSplit(path, 0, length,blkLocations[0].getHosts()));}} else {//Create empty hosts array for zerolength filessplits.add(makeSplit(path, 0, length,new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());LOG.debug("Total # of splits: " + splits.size());return splits;}

3.      TextInputFormat

TextInputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextInputFormat#createRecordReader方法逻辑比较简单,最后返回一个LineRecordReader对象,下面主要看这个方法LineRecordReader#nextKeyValue。

public boolean nextKeyValue() throws IOException {if (key == null) {key = newLongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// We always read oneextra line, which lies outside the upper// split limit i.e. (end -1)
while (getFilePosition() <= end) {
//这一句就是读数据的地方了newSize =in.readLine(value, maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize <maxLineLength) {break;}// line too long. tryagainLOG.info("Skippedline of size " + newSize + " at pos " +(pos -newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}

4.      OutputFormat

OutputFormat# getRecordWriter (TaskAttemptContext context)

返回一个RecordWriter对象

OutputFormat# checkOutputSpecs (JobContext context)

判断输出文件存不存在,写过MR程序的人应该很熟悉了,如果输出路径已经存在话,会抛出一个Output directory " + outDir + " already exists"错误,就是这个方法抛出的。

OutputFormat# getOutputCommitter (TaskAttemptContext context)

MR运行过程中,会产生很多中间文件,比如Mapper的输出,推测式执行Task时产生的文件等等,这个方法负责在任务执行完成后,处理这些中间文件。顺便说下,OutputCommitter对象里的方法都是回调方法,MR自动调用。

5.      FileOutputFormat

FileOutputFormat对上边提到的3个方法中的后两个提供了通用的实现,OutputFormat# getRecordWriter (TaskAttemptContext context)方法需要子类自己实现。

public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException{// Ensure that the outputdirectory is set and not already therePath outDir =getOutputPath(job);if (outDir == null) {throw newInvalidJobConfException("Output directory not set.");}// get delegation tokenfor outDir's file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { outDir },job.getConfiguration());if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {throw newFileAlreadyExistsException("Output directory " + outDir +" already exists");}}public synchronizedOutputCommittergetOutputCommitter(TaskAttemptContext context)throws IOException {if (committer == null) {Path output =getOutputPath(context);committer = newFileOutputCommitter(output, context);}return committer;}

6.      TextOutputFormat

TextOutputFormat直接使用了FileInputFormat的getSplits方法,自己实现了createRecordReader方法。

TextOutputFormat#createRecordWriter方法逻辑比较简单,最后返回一个LineRecordWriter对象,下面主要看这个方法LineRecordReader#write(K key,V Value)。

public synchronized void write(K key, V value)throws IOException {boolean nullKey = key ==null || key instanceof NullWritable;boolean nullValue =value == null || value instanceof NullWritable;if (nullKey &&nullValue) {return;}//这个地方是最主要的逻辑if (!nullKey) {writeObject(key);//写出key}if (!(nullKey ||nullValue)) {out.write(keyValueSeparator);//写出key/value的分隔符}if (!nullValue) {writeObject(value);//写出value}out.write(newline);//写出行结束符
}

SqoopInputFormat和SqoopOutputFormat

下面的分析基于Sqoop1.99.3

在Sqoop中,InputFormat和OutputFormat的子类有3个,分别是:

public class SqoopInputFormat extendsInputFormat<SqoopSplit, NullWritable>
public class SqoopFileOutputFormat extendsFileOutputFormat<Data, NullWritable>
public class SqoopNullOutputFormat extendsOutputFormat<Data, NullWritable>

下面一个个分析:

1.      SqoopInputFormat

从泛型传入的类型可以看出,SqoopInputFormat的key是SqoopSplit,value是NullWritable。直接把整个InputSplit作为key,传到Mapper中,SqoopInputFormat不对InputSplit做任何的解析操作。

2.      SqoopFileOutputFormat

从泛型传入的类型来看,跟SqoopInputFormat类似,只是用KEY部分保存信息。SqoopFileOutputFormat只重写了父类FileOutputFormat的getRecordWriter方法和getOutputCommitter方法,checkOutputSpec方法使用的父类的。

getRecordWriter方法调用SqoopOutputFormatLoadExecutor#getRecordWriter,这个方法在返回一个SqoopRecordWriter的同时,开启一个消费者线程,SqoopRecordWriter是生产者线程。[后面接着分析Sqoop源码时细说]

3.      SqoopNullOutputFormat

SqoopNullOutputFormat将OutputFormat的3个方法都重写了。SqoopNullOutputFormat#getRecordWriter方法同样是调用SqoopOutputFormatLoadExecutor#getRecordWriter。

http://www.xdnf.cn/news/799507.html

相关文章:

  • [正则表达式]文本框输入内容控制=限制仅仅只给输入数字;中文;英文;符号。
  • android一些细节问题
  • Windows XP任务计划不能执行的解决的方法
  • BigWorld
  • DOTA系列 食尸鬼(小狗)攻略
  • 一份较详细的MS服务
  • CheckBox复选框
  • 关于自己项目(听书系统)的简介
  • Windows Media Player 播放.WMV文件破解许可证
  • SID310S/D/Q-10MHz, 低噪声, 轨至轨, CMOS 运算放大器替代SGM722
  • Windows新版算号器技术原理
  • 柏睿网络-建设机房的必要性
  • 什么是Google PR值? 如何提高PR值
  • Bitmap recycle()
  • Linux操作系统各版本ISO镜像下载(包括oracle linux\redhat\centos\ubuntu\debian等)
  • 智能影视站系统 光线 CMS1.5 正式版
  • ArrayList的遍历方式
  • 输送带的设计
  • Java中的日志记录
  • qvod(快播)电影批量下载器(轻松下载连续剧)
  • 【2025RAG最新进展】
  • 卡巴斯基KAV/KIS 6.0/7.0 授权许可文件永久免费更新方法
  • 计算机组中电脑无法访问,【工作组计算机无法访问】解决方法
  • Windows安装pyav报错:ERROR: Failed building wheel for av.Failed to build av. ERROR: Could not build wheel
  • 权威汇总 | 2023年交通运输工程类国际会议
  • HgzVip1.2.rar
  • 动态规划- 【气球游戏】
  • 注册表 Run、RunOnce 键值解析
  • PB常用函数
  • 国外10个最佳和最流行的酷站推荐网站