【大数据】MapReduce 编程-- PageRank--网页排名算法,用于衡量网页“重要性”-排序网页
PageRank 是 Google 创始人拉里·佩奇(Larry Page)和谢尔盖·布林(Sergey Brin)在 1998 年提出的一种网页排名算法,用于衡量网页“重要性”的一种方式。它是搜索引擎中用于排序网页的一种基础算法
一个网页越是被其他重要网页链接,它就越重要
PageRank 的计算流程
-
初始化:假设总共 N 个网页,每个网页初始 PR 值为 1/N。
-
迭代计算:通过 MapReduce 不断迭代更新 PR 值,直到值趋于稳定。
-
结果输出:PR 值越大,说明该网页越重要,排名越靠前
A 0.25 B C D
B 0.25 A D
C 0.25 C
D 0.25 B C
-
第一列:网页编号(如 A)
-
第二列:初始 PageRank 值(例如 0.25)
-
后续列:该网页链接到的其他网页
迭代的计算PageRank
值,每次MapReduce 的输出要和输入的格式是一样的,这样才能使得Mapreduce 的输出用来作为下一轮MapReduce 的输入
Map过程
解析输入行,提取:
-
当前网页 ID
-
当前网页的 PR 值
-
当前网页链接的其他网页列表
计算出要链接到的其他网友的个数,然后求出当前网页对其他网页的贡献值。
第一种输出的< key ,value>
中的key
表示其他网页,value
表示当前网页对其他网页的贡献值
为了区别这两种输出
出链网页贡献值(标记为 @):<出链网页, @贡献值>
第二种输出的< key ,value>
中的key
表示当前网页,value
表示所有其他网页。
网页链接列表(标记为 &):<当前网页, &链接网页列表>
B @0.0833
C @0.0833
D @0.0833
A &B C D
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*map过程*/
public class MyMapper extends Mapper<Object,Text,Text,Text>{ private String id;private float pr; private int count;private float average_pr; public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ StringTokenizer str = new StringTokenizer(value.toString());//对value进行解析id =str.nextToken();//id为解析的第一个词,代表当前网页pr = Float.parseFloat(str.nextToken());//pr为解析的第二个词,转换为float类型,代表PageRank值count = str.countTokens();//count为剩余词的个数,代表当前网页的出链网页个数average_pr = pr/count;//求出当前网页对出链网页的贡献值String linkids ="&";//下面是输出的两类,分别有'@'和'&'区分while(str.hasMoreTokens()){String linkid = str.nextToken();context.write(new Text(linkid),new Text("@"+average_pr));//输出的是<出链网页,获得的贡献值>linkids +=" "+ linkid;} context.write(new Text(id), new Text(linkids));//输出的是<当前网页,所有出链网页>}
}
输入数据格式(value):网页ID PageRank值 出链网页1 出链网页2 ...
输出键值对:
-
<出链网页ID, "@贡献值">
(表示这个网页从别的网页获得了多少贡献) -
<当前网页ID, "& 出链网页列表">
(保留网页结构)
String id; // 当前网页ID
float pr; // 当前网页的PageRank值
int count; // 出链网页的数量
float average_pr; // 当前网页对每个出链网页的平均贡献值
StringTokenizer str = new StringTokenizer(value.toString());是把整行字符串(比如 "A 1.0 B C D")按照空格分割成一个个小单元(token)
id = str.nextToken(); // 第一个token是当前网页ID------取出第一个单词(比如 A
),表示当前正在处理的网页 ID,赋值给 id
pr = Float.parseFloat(str.nextToken()); // 第二个token是当前网页的PageRank值
取出第二个单词(比如 "1.0"
),将其转为 float
类型,就是当前网页的 PageRank 值,赋值给 pr
count = str.countTokens();// 剩下的token是出链网页数量----
统计剩余 token 的数量
average_pr = pr / count; //把当前网页的 PageRank 值平均分配给所有它链接的网页
贡献值输出:
while(str.hasMoreTokens()) {String linkid = str.nextToken(); // B, 然后 C, 然后 Dcontext.write(new Text(linkid), new Text("@" + average_pr));linkids += linkid + " "; // 把 B、C、D 加入 linkids 中
}
str.hasMoreTokens() 只要还有未读取的 token(即还有出链网页没处理完),就继续执行循环体
网页结构输出(带 &
开头):
String linkids记录当前网页的所有出链网页 ID
context.write(new Text(id), new Text(linkids));
Shuffle 是指 Map 阶段输出的数据按照 key 进行分组,并将具有相同 key 的数据发送到同一个 Reduce 任务中处理的过程
每个网页 Map 阶段都会:
-
向它出链的网页发 PageRank 贡献(加@前缀)
-
自己保留一份出链结构
Shuffle 阶段:按网页ID归并聚合
-
对 Map 输出的 key(网页 ID)进行排序
-
将相同 key 的所有 value 合并成一个列表
Reducer 接收到的格式为:<网页ID, [贡献值, 出链结构]>
<网页ID, 列表[@贡献1, @贡献2, ..., &出链结构]>
Reduce过程
-
求每个网页的新 PageRank 值
-
保留该网页的出链结构
-
输出格式为:
网页ID 新的PR值 出链网页列表
shuffule
的输出也即是reduce
的输入。
reduce
输入的key
直接作为输出的key
对reduce
输入的value
进行解析,它是一个列表
a.若列表里的值里包含`@`,就把该值`@`后面的字符串转化成`float`型加起来
b.若列表里的值里包含`&`,就把该值`&`后面的字符串提取出来
c.把所有贡献值的加总,和提取的字符串进行连接,作为`reduce`的输出`value`
public class MyReducer extends Reducer<Text,Text,Text,Text>{
继承 Hadoop 提供的 Reducer
类,泛型参数说明:
-
Text, Text
:输入的 key 和 value 类型 -
Text, Text
:输出的 key 和 value 类型
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
为每一个网页 key
传入一个 values
列表,里面是 Shuffle 过程收集到的所有值
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce过程:计算每个网页的新PageRank值,并保留出链网页结构。* 输入:<网页ID, [@贡献值, @贡献值, ..., &出链网页列表]>* 输出:<网页ID, 新PageRank值 + 出链网页列表>*/
public class MyReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {String lianjie = ""; // 用于保存当前网页的出链网页列表(结构信息)float pr = 0; // 用于累加当前网页从其他网页获得的PageRank贡献值// 遍历所有传入的值:包含两类信息,分别通过首字符判断for (Text val : values) {String strVal = val.toString(); // 当前值转换为字符串if (strVal.substring(0, 1).equals("@")) {// 以@开头,表示这是从其他网页传来的PageRank贡献值// 取出@后面的数值并累加pr += Float.parseFloat(strVal.substring(1));} else if (strVal.substring(0, 1).equals("&")) {// 以&开头,表示这是本网页的出链结构信息// 将&后面的网页列表保留下来lianjie += strVal.substring(1); // 注意可能是多个网页用空格分隔}}// 平滑处理(加入跳转因子d = 0.8)// 假设网页总数为4,(1 - d) / N = 0.2 * 0.25 = 0.05// 新PageRank = d * 贡献值总和 + (1 - d)/Npr = 0.8f * pr + 0.2f * 0.25f;// 构造输出字符串:新PR值 + 出链网页列表String result = pr + lianjie;// 输出结果:<当前网页ID, 新的PageRank值 + 出链网页列表>context.write(key, new Text(result));}
}
遍历所有值,分类处理
pr += Float.parseFloat(val.toString().substring(1));
如果是 @
开头,就从第 1 个字符开始截取字符串(去掉 @
),再把它转换成浮点数,并累加到 pr
中
lianjie += val.toString().substring(1);
如果是 &
开头,就把 &
后面的出链网页字符串加到变量 lianjie
中
-
以
@
开头:表示来自其他网页的 PageRank 贡献值,提取并累加。 -
以
&
开头:表示这是该网页自身的 出链网页结构,保留下来。
pr = 0.8f * pr + 0.2f * 0.25f;
PageRank 中的阻尼系数模型:
-
0.8f
:阻尼系数 d(表示 80% 用户点击链接) -
0.2f
:1 - d,有 20% 用户会随机跳转 -
0.25f
:假设网页总数是 4 个,随机跳转概率均分为 0.25
PR(A) = d × 所有贡献值之和 + (1 - d) / N
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;public class MyRunner {public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {// 创建 Hadoop 配置对象Configuration conf = new Configuration();// 使用控制台输入获取初始输入路径和输出路径Scanner sc = new Scanner(System.in);System.out.print("inputPath:");String inputPath = sc.next(); // 第一次输入的 HDFS 输入路径,如:/pagerank/inputSystem.out.print("outputPath:");String outputPath = sc.next(); // 第一次输出的 HDFS 路径,如:/pagerank/output// 进行 PageRank 的迭代计算,这里迭代 5 次for (int i = 1; i <= 5; i++) {// 创建新的 MapReduce 作业Job job = Job.getInstance(conf);// 设置 Job 的主类,用于打包 Jarjob.setJarByClass(MyRunner.class);// 设置 Map 和 Reduce 的处理类job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);// 设置 Map 阶段输出键值对类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 设置 Reduce 阶段输出键值对类型(最终输出)job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入数据路径(每轮迭代输入路径是上一轮的输出)FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000" + inputPath));// 设置输出数据路径(每轮迭代输出不同路径)FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));// 更新下一轮迭代的输入输出路径inputPath = outputPath; // 当前输出变为下一轮的输入outputPath = outputPath + i; // 每次输出加上数字以区分路径(如 output1, output2,...)// 提交作业并等待执行完成job.waitForCompletion(true);}// 读取最终输出文件内容并打印到控制台try {// 获取 Hadoop 文件系统FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());// 拼接最终输出文件的路径(最后一轮输出的 part-r-00000)Path srcPath = new Path(outputPath.substring(0, outputPath.length() - 1) + "/part-r-00000");// 打开输出文件FSDataInputStream is = fs.open(srcPath);// 打印最终结果到控制台System.out.println("Results:");while (true) {String line = is.readLine(); // 读取一行结果if (line == null) break; // 如果到文件末尾,结束循环System.out.println(line); // 打印当前行}is.close(); // 关闭输入流} catch (Exception e) {e.printStackTrace(); // 如果读取输出失败,打印错误}}
}