当前位置: 首页 > news >正文

【大数据】MapReduce 编程--WordCount

API 是“Application Programming Interface”的缩写,即“应用程序编程接口”

Hadoop 提供了一套 基于 Java 的 API,用于开发 MapReduce 程序、访问 HDFS、控制作业等

MapReduce 是一种 分布式并行计算模型,主要用于处理 大规模数据集。它将计算任务分为两个阶段:

阶段功能程序类名(通常)
Map 阶段对输入数据进行处理和拆分,生成键值对(key-value)Mapper 类
Reduce 阶段汇总 Map 输出的 key-value 数据,对相同 key 的数据进行合并处理Reducer 类

MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output) 



Mapper实现

public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{//重写map这个方法//mapreduce框架每读一行数据就调用一次该方法protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value//key是这一行数据的起始偏移量,value是这一行的文本内容}
}

WcMap继承了 Hadoop 提供的 Mapper 类,并且指定了四个泛型参数

Mapper 的泛型Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

  • KEYIN: 输入键的类型(Map函数输入)--

  • VALUEIN: 输入值的类型

  • KEYOUT: 输出键的类型(Map函数输出)

  • VALUEOUT: 输出值的类型

参数类型含义
LongWritable输入键一行文本的起始偏移量(比如第几字节开始)
Text输入值一整行文本内容
Text输出键单词(比如:"Hello")
LongWritable输出值单词的计数(一般为 1)

Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<key,value>形式生成。

一个有三行文本的文件进行MapReduce操作。

读取第一行Hello World Bye World ,分割单词形成Map。

<Hello,1> <World,1> <Bye,1> <World,1>

读取第二行Hello Hadoop Bye Hadoop,分割单词形成Map。

<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>

读取第三行Bye Hadoop Hello Hadoop,分割单词形成Map。

<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>

MapReduce 中的 Mapper 实现,用于实现“词频统计”(WordCount)功能

按行读取文本,把每个单词都变成 <word, 1> 输出,为后续的词频统计做准备

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**** * @author Administrator* 1:4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的值*       KEYOUT是输入的key的类型,VALUEOUT是输入的value的值 * 2:map和reduce的数据输入和输出都是以key-value的形式封装的。* 3:默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value* 4:key-value数据是在网络中进行传递,节点和节点之间互相传递,在网络之间传输就需要序列化,但是jdk自己的序列化很冗余*    所以使用hadoop自己封装的数据类型,而不要使用jdk自己封装的数据类型;*    Long--->LongWritable*    String--->Text    */
public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{//重写map这个方法//mapreduce框架每读一行数据就调用一次该方法@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value//key是这一行数据的起始偏移量,value是这一行的文本内容//1:String str = value.toString();//2:切分单词,空格隔开,返回切分开的单词String[] words = StringUtils.split(str," ");//3:遍历这个单词数组,输出为key-value的格式,将单词发送给reducefor(String word : words){//输出的key是Text类型的,value是LongWritable类型的context.write(new Text(word), new LongWritable(1));}}
}

继承 Hadoop Mapper 的类---------------- public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>

重写的 map 方法---MapReduce 框架自动调用,对文件的每一行都会调用一次

@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
参数作用
key当前行在文件中的偏移量(不重要)
value当前这一行的文本内容(例如:"Hello Hadoop Bye")
contextHadoop 框架提供的对象,用于输出结果

把 Text 类型转换成普通字符串:String str = value.toString();

按空格切分字符串,获取单词数组:String[] words = StringUtils.split(str," ");例如:"Hello Hadoop Bye" => ["Hello", "Hadoop", "Bye"]-----Apache Commons 的 StringUtils.split 方法

遍历所有单词

for(String word : words){context.write(new Text(word), new LongWritable(1));
}

每拿到一个单词,就创建一对键值对 <单词, 1> 输出出去。这些结果会传给 Reduce 处理 

Hadoop 的 MapReduce 任务运行在集群中,key-value 是在网络上传输的, 不能使用 Java 原生类型(如 String、long),必须使用 Hadoop 提供的可序列化数据类型
Java类型Hadoop类型
StringText
longLongWritable
intIntWritable
floatFloatWritable



 

<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>不是我们写的代码做的,而是 MapReduce 框架自动完成的工作,叫做 Shuffle + 分组过程(有时也叫分区、排序、合并)

 MapReduce 的完整流程

 Mapper 阶段

输入的 3 行:

Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop

 Mapper 输出

<Hello,1> <World,1> <Bye,1> <World,1>
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>

 Combiner 阶段(局部归约)

如果配置了 Combiner(它的逻辑通常和 Reducer 是一样的),那在每台 Map 机器本地就会先对相同的 key 做一次合并

<Hello,1> <Hello,1> <Hello,1>合成:<Hello,3>

Combiner 是可选的优化步骤,有助于减少网络传输数据量--:Combiner 需要配置,它不是自动就起作用的

Shuffle + Sort + Grouping(框架自动做)

这一步发生在 Mapper 和 Reducer 之间,称为 洗牌过程(Shuffle),是 MapReduce 的精髓

步骤解释
Shuffle把相同 key 的数据发送给同一个 Reducer(网络传输)
Sort对 key 进行排序(Hadoop 默认会对 key 排序)
Group把相同 key 的多个 value 放在一个集合中

 最终 Reducer 接收到的数据

<Hello, [1,1,1]>
<World, [1,1]>
<Bye, [1,1,1]>
<Hadoop, [1,1,1,1]>

 假设你原始文本是:Hello World Hello

Map 阶段输出:<Hello,1> <World,1>  <Hello,1>
Reduce 阶段输入(自动分组):key = Hello, values = [1, 1]   key = World, values = [1]
Reduce 输出:<Hello,2>   <World,1>


 Reducer 实现

WcReduce 是 MapReduce 中的 Reducer 实现类

public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{//继承Reducer之后重写reduce方法//第一个参数是key,第二个参数是集合。//框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {}
}

继承了 Hadoop 提供的 Reducer 类 

泛型参数类型含义
KEYINText单词
VALUEINLongWritable数字
KEYOUTTextReduce 的输出 key(还是单词)
VALUEOUTLongWritableReduce 的输出 value(统计的总次数)

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
reduce 方法就处理每一组(key 相同)Map 输出的结果 

  • key: 单词,例如 "Hello""Hadoop"

  • values: 一个可遍历的对象,包含所有这个单词对应的值,例如 <"Hello", [1, 1, 1]>

  • context: 用来把结果写出去

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**** * @author Administrator* 1:reduce的四个参数,第一个key-value是map的输出作为reduce的输入,第二个key-value是输出单词和次数,所以*      是Text,LongWritable的格式;*/
public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{//继承Reducer之后重写reduce方法//第一个参数是key,第二个参数是集合。//框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法//<hello,{1,1,1,1,1,1.....}>@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {//将values进行累加操作,进行计数long count = 0;//遍历value的list,进行累加求和for(LongWritable value : values){count += value.get();}//输出这一个单词的统计结果//输出放到hdfs的某一个目录上面,输入也是在hdfs的某一个目录context.write(key, new LongWritable(count));}
}

累加单词出现次数

long count = 0;
for(LongWritable value : values){count += value.get();
}

LongWritable 是 Hadoop 提供的可序列化的 long 类型(因为 MapReduce 要在网络中传输数据,不能用普通的 Java 类型) 

定义一个变量 count,用来统计每个单词出现的总次数

遍历传进来的所有 value 值(也就是 1)

context.write(key, new LongWritable(count));
 



执行MapReduce任务

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。

设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成CombineReduce过程中的处理

设置了Map过程和Reduce过程的输出类型:key的类型为Textvalue的类型为IntWritable

任务的输出和输入路径则由命令行参数指定,并由FileInputFormatFileOutputFormat分别设定。

完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
/**** 1:用来描述一个特定的作业*       比如,该作业使用哪个类作为逻辑处理中的map,那个作为reduce* 2:还可以指定该作业要处理的数据所在的路径*        还可以指定改作业输出的结果放到哪个路径* @author Administrator**/
public class WcRunner{public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//创建配置文件Configuration conf = new Configuration();//获取一个作业Job job = Job.getInstance(conf);//设置整个job所用的那些类在哪个jar包job.setJarByClass(WcRunner.class);//本job使用的mapper和reducer的类job.setMapperClass(WcMap.class);job.setReducerClass(WcReduce.class);//指定reduce的输出数据key-value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//指定mapper的输出数据key-value类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);Scanner sc = new Scanner(System.in);System.out.print("inputPath:");String inputPath = sc.next();System.out.print("outputPath:");String outputPath = sc.next();//指定要处理的输入数据存放路径FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));//指定处理结果的输出数据存放路径FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));//将job提交给集群运行 job.waitForCompletion(true);//输出结果try {FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());Path srcPath = new Path(outputPath+"/part-r-00000");FSDataInputStream is = fs.open(srcPath);System.out.println("Results:");while(true) {String line = is.readLine();if(line == null) {break;}System.out.println(line);}is.close();}catch(Exception e) {e.printStackTrace();}} 
}

HDFS 读取输出文件 part-r-00000,然后打印文件的每一行,直到文件内容读取完为止。其核心操作是通过 FileSystem 获取 HDFS 文件系统,然后使用 FSDataInputStream 读取文件的内容

---------- 

FileSystem.get() 方法获取 HDFS 文件系统的实例 

URI: 这是 HDFS 文件系统的 URI,指定了 HDFS 的访问路径。"hdfs://master:9000" 表示 HDFS 在 master 主机的 9000 端口上运行

Configuration: 这是 Hadoop 的配置对象,包含了关于 Hadoop 系统的各种设置。new Configuration() 会创建一个默认的配置

outputPath 是用户在输入时提供的输出路径,这里使用 outputPath + "/part-r-00000" 来构建文件路径---part-r-00000 是默认的输出文件名

FSDataInputStream 对象是用来读取文件内容的

is.readLine() 方法逐行读取文件的内容。

  • 如果 readLine() 返回 null,表示文件已经读完。

  • 每读取一行,就将其打印出来。

http://www.xdnf.cn/news/406081.html

相关文章:

  • AI赋能:构建个性化智能学习规划系统
  • Android 中 Handler (创建时)内存泄漏问题及解决方案
  • PDFMathTranslate:科学 PDF 文件翻译及双语对照工具
  • Web4X:站在Web4.0时代的起点,定义AI商业新生态
  • 专业知识的检索过程 stepbystep - 样例
  • ARM-CortexM固件升级相关问题研究
  • 采用AI神经网络降噪算法的通信语音降噪(ENC)模组性能测试和应用
  • 学习笔记:Conda 环境共享
  • 2025年SDK游戏盾技术深度解析:AI赋能下的DDoS/CC攻击防御革命
  • Html5新特性_js 给元素自定义属性_json 详解_浅克隆与深克隆
  • 模型上下文协议(MCP):AI的“万能插座”
  • Halcon案例(一):C#联合Halcon识别路由器上的散热孔
  • 【Vue3】使用vite创建Vue3工程、Vue3基本语法讲解
  • Windows 添加 hosts 映射
  • 零碳园区能源系统-多能互补体系
  • 星海智算云平台部署GPT-SoVITS模型教程
  • 傲云源墅:以五傲价值重构北京主城别墅格局
  • Spring MVC 和 Spring Boot 是如何访问静态资源的?
  • MySQL数据库表的约束
  • 反弹shell再入门
  • MySQL查询优化100条军规
  • 深度解析RagFlow:本地大模型驱动的高效知识库应用搭建指南
  • Java MVC
  • nRF5_SDK_17.1.0_ddde560之ble_app_uart_c 出错
  • [Java实战]Spring Boot 整合 Session 共享(十七)
  • LintCode第42题-最大子数组 II
  • 《Vuejs设计与实现》第 5 章(非原始值响应式方案) 中
  • OpenCV 的 CUDA 模块中用于将一个多通道 GpuMat 图像拆分成多个单通道图像的函数split()
  • 【AI News | 20250512】每日AI进展
  • 一键生成达梦、Oracle、MySQL 数据库 ER 图!解锁高效数据库设计!