博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop学习之路(二十)MapReduce求TopN
阅读量:6076 次
发布时间:2019-06-20

本文共 14818 字,大约阅读时间需要 49 分钟。

前言

在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。

技术点

MapReduce框架中,用到的排序主要有两种:快速排序和基于堆实现的优先级队列(PriorityQueue)。

Mapper阶段

从map输出到环形缓冲区的数据会被排序(这是MR框架中改良的快速排序),这个排序涉及partition和key,当缓冲区容量占用80%,会spill数据到磁盘,生成IFile文件,Map结束后,会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列),以供不同的reduce来拉取相应的数据。

Reducer阶段 

从Mapper端取回的数据已是部分有序,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sort阶段和reduce阶段并行化,在sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆→取堆顶元素→重新建堆→取堆顶元素...),这样,sort和reduce可以并行进行。

分组Top N分析

在数据处理中,经常会碰到这样一个场景,对表数据按照某一字段分组,然后找出各自组内最大的几条记录情形。针对这种分组Top N问题,我们利用Hive、MapReduce等多种工具实现一下。

场景模拟

computer,huangxiaoming,85,86,41,75,93,42,85computer,xuzheng,54,52,86,91,42computer,huangbo,85,42,96,38english,zhaobenshan,54,52,86,91,42,85,75english,liuyifei,85,41,75,21,85,96,14algorithm,liuyifei,75,85,62,48,54,96,15computer,huangjiaju,85,75,86,85,85english,liuyifei,76,95,86,74,68,74,48english,huangdatou,48,58,67,86,15,33,85algorithm,huanglei,76,95,86,74,68,74,48algorithm,huangjiaju,85,75,86,85,85,74,86computer,huangdatou,48,58,67,86,15,33,85english,zhouqi,85,86,41,75,93,42,85,75,55,47,22english,huangbo,85,42,96,38,55,47,22algorithm,liutao,85,75,85,99,66computer,huangzitao,85,86,41,75,93,42,85math,wangbaoqiang,85,86,41,75,93,42,85computer,liujialing,85,41,75,21,85,96,14,74,86computer,liuyifei,75,85,62,48,54,96,15computer,liutao,85,75,85,99,66,88,75,91computer,huanglei,76,95,86,74,68,74,48english,liujialing,75,85,62,48,54,96,15math,huanglei,76,95,86,74,68,74,48math,huangjiaju,85,75,86,85,85,74,86math,liutao,48,58,67,86,15,33,85english,huanglei,85,75,85,99,66,88,75,91math,xuzheng,54,52,86,91,42,85,75math,huangxiaoming,85,75,85,99,66,88,75,91math,liujialing,85,86,41,75,93,42,85,75english,huangxiaoming,85,86,41,75,93,42,85algorithm,huangdatou,48,58,67,86,15,33,85algorithm,huangzitao,85,86,41,75,93,42,85,75

 一、数据解释

数据字段个数不固定:

第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:

1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

第一题

CourseScoreMR1.java

1 import java.io.IOException;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.FileSystem;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.io.DoubleWritable;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14  15 public class CourseScoreMR1 { 16  17     public static void main(String[] args) throws Exception { 18          19         Configuration conf = new Configuration(); 20         FileSystem fs = FileSystem.get(conf); 21         Job job = Job.getInstance(conf); 22          23          24         job.setJarByClass(CourseScoreMR1.class); 25         job.setMapperClass(CourseScoreMR1Mapper.class); 26         job.setReducerClass(CourseScoreMR1Reducer.class); 27          28         job.setMapOutputKeyClass(Text.class); 29         job.setMapOutputValueClass(DoubleWritable.class); 30         job.setOutputKeyClass(Text.class); 31         job.setOutputValueClass(Text.class); 32          33          34         Path inputPath = new Path("E:\\bigdata\\cs\\input"); 35         Path outputPath = new Path("E:\\bigdata\\cs\\output_1"); 36         FileInputFormat.setInputPaths(job, inputPath); 37         if(fs.exists(outputPath)){ 38             fs.delete(outputPath, true); 39         } 40         FileOutputFormat.setOutputPath(job, outputPath); 41          42          43         boolean isDone = job.waitForCompletion(true); 44         System.exit(isDone ? 0 : 1); 45     } 46      47     public static class CourseScoreMR1Mapper extends Mapper
{ 48 49 /** 50 * 数据的三个字段: course , name, score 51 * 52 * value == algorithm,huangzitao,85,86,41,75,93,42,85,75 53 * 54 * 输出的key和value: 55 * 56 * key : course 57 * 58 * value : avgScore 59 * 60 * 格式化数值相关的操作的API : NumberFormat 61 * SimpleDateFormat 62 */ 63 64 Text outKey = new Text(); 65 DoubleWritable outValue = new DoubleWritable(); 66 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 74 int sum = 0; 75 int count = 0; 76 77 for(int i = 2; i
{ 96 97 98 Text outValue = new Text(); 99 /**100 * key : course101 * 102 * values : 98.7 87.6103 */104 @Override105 protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {106 107 double sum = 0;108 int count = 0;109 110 for(DoubleWritable dw : values){111 sum += dw.get();112 count ++;113 }114 115 double lastAvgScore = sum / count;116 117 outValue.set(count+"\t" + lastAvgScore);118 119 context.write(key, outValue);120 }121 }122 }
View Code

第二题

CourseScoreMR2.java

1 import java.io.IOException; 2  3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job;10 import org.apache.hadoop.mapreduce.Mapper;11 import org.apache.hadoop.mapreduce.Reducer;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 15 import com.ghgj.mr.exercise.pojo.CourseScore;16 import com.ghgj.mr.exercise.ptn.CSPartitioner;17 18 public class CourseScoreMR2{19 20     public static void main(String[] args) throws Exception {21         22         Configuration conf = new Configuration();23 24         FileSystem fs = FileSystem.get(conf);25         Job job = Job.getInstance(conf);26         27         28         job.setJarByClass(CourseScoreMR2.class);29         job.setMapperClass(CourseScoreMR2Mapper.class);30 //        job.setReducerClass(CourseScoreMR2Reducer.class);31         32         job.setMapOutputKeyClass(CourseScore.class);33         job.setMapOutputValueClass(NullWritable.class);34 //        job.setOutputKeyClass(CourseScore.class);35 //        job.setOutputValueClass(NullWritable.class);36         37         38         job.setPartitionerClass(CSPartitioner.class);39         job.setNumReduceTasks(4);40         41         42         Path inputPath = new Path("E:\\bigdata\\cs\\input");43         Path outputPath = new Path("E:\\bigdata\\cs\\output_2");44         FileInputFormat.setInputPaths(job, inputPath);45         if(fs.exists(outputPath)){46             fs.delete(outputPath, true);47         }48         FileOutputFormat.setOutputPath(job, outputPath);49         50         51         boolean isDone = job.waitForCompletion(true);52         System.exit(isDone ? 0 : 1);53     }54     55     public static class CourseScoreMR2Mapper extends Mapper
{56 57 CourseScore cs = new CourseScore();58 59 /**60 * value = math,huangxiaoming,85,75,85,99,66,88,75,9161 */62 @Override63 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {64 65 String[] split = value.toString().split(",");66 67 String course = split[0];68 String name = split[1];69 70 int sum = 0;71 int count = 0;72 73 for(int i = 2; i
{92 93 @Override94 protected void reduce(CourseScore key, Iterable
values, Context context) throws IOException, InterruptedException {95 96 97 }98 }99 }
View Code

CSPartitioner.java

1 import org.apache.hadoop.io.NullWritable; 2 import org.apache.hadoop.mapreduce.Partitioner; 3  4 import com.ghgj.mr.exercise.pojo.CourseScore; 5  6 public class CSPartitioner extends Partitioner
{ 7 8 /** 9 * 10 */11 @Override12 public int getPartition(CourseScore key, NullWritable value, int numPartitions) {13 14 String course = key.getCourse();15 if(course.equals("math")){16 return 0;17 }else if(course.equals("english")){18 return 1;19 }else if(course.equals("computer")){20 return 2;21 }else{22 return 3;23 }24 25 26 }27 28 29 }
View Code

 第三题

CourseScoreMR3.java

1 import java.io.IOException;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.FileSystem;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.io.LongWritable;  7 import org.apache.hadoop.io.NullWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14  15 import com.ghgj.mr.exercise.gc.CourseScoreGC; 16 import com.ghgj.mr.exercise.pojo.CourseScore; 17  18 public class CourseScoreMR3{  19      20     private static final int TOPN = 3; 21  22     public static void main(String[] args) throws Exception { 23          24         Configuration conf = new Configuration(); 25         FileSystem fs = FileSystem.get(conf); 26         Job job = Job.getInstance(conf); 27          28          29         job.setJarByClass(CourseScoreMR3.class); 30         job.setMapperClass(CourseScoreMR2Mapper.class); 31         job.setReducerClass(CourseScoreMR2Reducer.class); 32          33         job.setMapOutputKeyClass(CourseScore.class); 34         job.setMapOutputValueClass(NullWritable.class); 35         job.setOutputKeyClass(CourseScore.class); 36         job.setOutputValueClass(NullWritable.class); 37          38          39 //        job.setPartitionerClass(CSPartitioner.class); 40 //        job.setNumReduceTasks(4); 41          42          43         // 指定分组规则 44         job.setGroupingComparatorClass(CourseScoreGC.class); 45          46          47         Path inputPath = new Path("E:\\bigdata\\cs\\input"); 48         Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last"); 49         FileInputFormat.setInputPaths(job, inputPath); 50         if(fs.exists(outputPath)){ 51             fs.delete(outputPath, true); 52         } 53         FileOutputFormat.setOutputPath(job, outputPath); 54          55          56         boolean isDone = job.waitForCompletion(true); 57         System.exit(isDone ? 0 : 1); 58     } 59      60     public static class CourseScoreMR2Mapper extends Mapper
{ 61 62 CourseScore cs = new CourseScore(); 63 64 /** 65 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 66 */ 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 String name = split[1]; 74 75 int sum = 0; 76 int count = 0; 77 78 for(int i = 2; i
{ 97 98 int count = 0; 99 100 /**101 * reducer阶段的reduce方法的调用参数:key相同的额一组key-vlaue102 * 103 * redcuer阶段,每次遇到一个不同的key的key_value组, 那么reduce方法就会被调用一次。104 * 105 * 106 * values这个迭代器只能迭代一次。107 * values迭代器在迭代的过程中迭代出来的value会变,同时,这个value所对应的key也会跟着变 合理108 * 109 */110 @Override111 protected void reduce(CourseScore key, Iterable
values, Context context) throws IOException, InterruptedException {112 113 114 int count = 0;115 116 for(NullWritable nvl : values){117 System.out.println("*********************************** " + (++count) + " " + key.toString());118 119 if(count == 3){120 return;121 }122 }123 124 125 // 原样输出126 /*for(NullWritable nvl : values){127 context.write(key, nvl);128 }*/129 130 131 // 输出每门课程的最高分数 , 预期结果中,key的显示都是一样的132 // for(NullWritable nvl : values){133 // System.out.println(key + " - " nvl);134 // 135 // valueList.add(nvl);136 // }137 138 // List
valueList = null;139 // 预期结果中,key的显示都是一样的140 /*int count = 0;141 for(NullWritable nvl : values){142 count++;143 }144 for(int i = 0; i
View Code

CourseScoreGC.java

1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3  4 import com.ghgj.mr.exercise.pojo.CourseScore; 5  6 /** 7  *  分组规则的指定 8  */ 9 public class CourseScoreGC extends WritableComparator{10     11     public CourseScoreGC(){12         super(CourseScore.class, true);13     }14 15     /**16      *   17      *   方法的定义解释:18      *   19      *   方法的意义:一般来说,都可以从方法名找到一些提示20      *   方法的参数:将来你的MR程序中,要作为key的两个对象,是否是相同的对象21      *   方法的返回值: 返回值类型为int  当返回值为0的时候。证明, 两个参数对象,经过比较之后,是同一个对象22      *   23      *   在我们的需求中: 分组规则是  Course24      * 25      */26     @Override27     public int compare(WritableComparable a, WritableComparable b) {28 29         CourseScore cs1 = (CourseScore)a;30         CourseScore cs2 = (CourseScore)b;31         32         int compareTo = cs1.getCourse().compareTo(cs2.getCourse());33         34         return compareTo;35     }36 }
View Code

 

转载地址:http://dvxgx.baihongyu.com/

你可能感兴趣的文章
0709作业
查看>>
sql server 查询出的结果集,拼接某一列赋值给一个变量
查看>>
大脑记忆 过程
查看>>
16进制的简单运算http://acm.nyist.net/JudgeOnline/problem.php?pid=244
查看>>
C# 为什么使用了多线程界面假死?
查看>>
遍历打印目录文件
查看>>
IE6/7中不支持inline-block,如何解决
查看>>
做开发的三种能力
查看>>
MultipartResolver实现文件上传功能
查看>>
Javascript/jquery异步加载使用方法详解(转)
查看>>
将列【1,2,3】转换为【类别1,类别2,类别3】
查看>>
第八章 Libgdx输入处理(8)振动器
查看>>
Android访问php webservice
查看>>
不耐烦,不淡定,不会好好说话,为何如今遍地戾气?
查看>>
微信公众平台开发(37)百度魔图
查看>>
微信公众平台开发(44)历史上的今天
查看>>
distcc源码研究五
查看>>
基于php常用正则表达整理(上)
查看>>
剑指offer-反转链表15
查看>>
jmeter聚合报告导出时乱码的解决
查看>>