前言
在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。
技术点
MapReduce框架中,用到的排序主要有两种:快速排序和基于堆实现的优先级队列(PriorityQueue)。
Mapper阶段
从map输出到环形缓冲区的数据会被排序(这是MR框架中改良的快速排序),这个排序涉及partition和key,当缓冲区容量占用80%,会spill数据到磁盘,生成IFile文件,Map结束后,会将IFile文件排序合并成一个大文件(基于堆实现的优先级队列),以供不同的reduce来拉取相应的数据。
Reducer阶段
从Mapper端取回的数据已是部分有序,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将sort阶段和reduce阶段并行化,在sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,并不断的移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程(建堆→取堆顶元素→重新建堆→取堆顶元素...),这样,sort和reduce可以并行进行。
分组Top N分析
在数据处理中,经常会碰到这样一个场景,对表数据按照某一字段分组,然后找出各自组内最大的几条记录情形。针对这种分组Top N问题,我们利用Hive、MapReduce等多种工具实现一下。
场景模拟
computer,huangxiaoming,85,86,41,75,93,42,85computer,xuzheng,54,52,86,91,42computer,huangbo,85,42,96,38english,zhaobenshan,54,52,86,91,42,85,75english,liuyifei,85,41,75,21,85,96,14algorithm,liuyifei,75,85,62,48,54,96,15computer,huangjiaju,85,75,86,85,85english,liuyifei,76,95,86,74,68,74,48english,huangdatou,48,58,67,86,15,33,85algorithm,huanglei,76,95,86,74,68,74,48algorithm,huangjiaju,85,75,86,85,85,74,86computer,huangdatou,48,58,67,86,15,33,85english,zhouqi,85,86,41,75,93,42,85,75,55,47,22english,huangbo,85,42,96,38,55,47,22algorithm,liutao,85,75,85,99,66computer,huangzitao,85,86,41,75,93,42,85math,wangbaoqiang,85,86,41,75,93,42,85computer,liujialing,85,41,75,21,85,96,14,74,86computer,liuyifei,75,85,62,48,54,96,15computer,liutao,85,75,85,99,66,88,75,91computer,huanglei,76,95,86,74,68,74,48english,liujialing,75,85,62,48,54,96,15math,huanglei,76,95,86,74,68,74,48math,huangjiaju,85,75,86,85,85,74,86math,liutao,48,58,67,86,15,33,85english,huanglei,85,75,85,99,66,88,75,91math,xuzheng,54,52,86,91,42,85,75math,huangxiaoming,85,75,85,99,66,88,75,91math,liujialing,85,86,41,75,93,42,85,75english,huangxiaoming,85,86,41,75,93,42,85algorithm,huangdatou,48,58,67,86,15,33,85algorithm,huangzitao,85,86,41,75,93,42,85,75
一、数据解释
数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,第二个是学生姓名,后面是每次考试的分数二、统计需求:
1、统计每门课程的参考人数和课程平均分2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分
第一题
CourseScoreMR1.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.DoubleWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 public class CourseScoreMR1 { 16 17 public static void main(String[] args) throws Exception { 18 19 Configuration conf = new Configuration(); 20 FileSystem fs = FileSystem.get(conf); 21 Job job = Job.getInstance(conf); 22 23 24 job.setJarByClass(CourseScoreMR1.class); 25 job.setMapperClass(CourseScoreMR1Mapper.class); 26 job.setReducerClass(CourseScoreMR1Reducer.class); 27 28 job.setMapOutputKeyClass(Text.class); 29 job.setMapOutputValueClass(DoubleWritable.class); 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(Text.class); 32 33 34 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 35 Path outputPath = new Path("E:\\bigdata\\cs\\output_1"); 36 FileInputFormat.setInputPaths(job, inputPath); 37 if(fs.exists(outputPath)){ 38 fs.delete(outputPath, true); 39 } 40 FileOutputFormat.setOutputPath(job, outputPath); 41 42 43 boolean isDone = job.waitForCompletion(true); 44 System.exit(isDone ? 0 : 1); 45 } 46 47 public static class CourseScoreMR1Mapper extends Mapper{ 48 49 /** 50 * 数据的三个字段: course , name, score 51 * 52 * value == algorithm,huangzitao,85,86,41,75,93,42,85,75 53 * 54 * 输出的key和value: 55 * 56 * key : course 57 * 58 * value : avgScore 59 * 60 * 格式化数值相关的操作的API : NumberFormat 61 * SimpleDateFormat 62 */ 63 64 Text outKey = new Text(); 65 DoubleWritable outValue = new DoubleWritable(); 66 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 74 int sum = 0; 75 int count = 0; 76 77 for(int i = 2; i { 96 97 98 Text outValue = new Text(); 99 /**100 * key : course101 * 102 * values : 98.7 87.6103 */104 @Override105 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {106 107 double sum = 0;108 int count = 0;109 110 for(DoubleWritable dw : values){111 sum += dw.get();112 count ++;113 }114 115 double lastAvgScore = sum / count;116 117 outValue.set(count+"\t" + lastAvgScore);118 119 context.write(key, outValue);120 }121 }122 }
第二题
CourseScoreMR2.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job;10 import org.apache.hadoop.mapreduce.Mapper;11 import org.apache.hadoop.mapreduce.Reducer;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 15 import com.ghgj.mr.exercise.pojo.CourseScore;16 import com.ghgj.mr.exercise.ptn.CSPartitioner;17 18 public class CourseScoreMR2{19 20 public static void main(String[] args) throws Exception {21 22 Configuration conf = new Configuration();23 24 FileSystem fs = FileSystem.get(conf);25 Job job = Job.getInstance(conf);26 27 28 job.setJarByClass(CourseScoreMR2.class);29 job.setMapperClass(CourseScoreMR2Mapper.class);30 // job.setReducerClass(CourseScoreMR2Reducer.class);31 32 job.setMapOutputKeyClass(CourseScore.class);33 job.setMapOutputValueClass(NullWritable.class);34 // job.setOutputKeyClass(CourseScore.class);35 // job.setOutputValueClass(NullWritable.class);36 37 38 job.setPartitionerClass(CSPartitioner.class);39 job.setNumReduceTasks(4);40 41 42 Path inputPath = new Path("E:\\bigdata\\cs\\input");43 Path outputPath = new Path("E:\\bigdata\\cs\\output_2");44 FileInputFormat.setInputPaths(job, inputPath);45 if(fs.exists(outputPath)){46 fs.delete(outputPath, true);47 }48 FileOutputFormat.setOutputPath(job, outputPath);49 50 51 boolean isDone = job.waitForCompletion(true);52 System.exit(isDone ? 0 : 1);53 }54 55 public static class CourseScoreMR2Mapper extends Mapper{56 57 CourseScore cs = new CourseScore();58 59 /**60 * value = math,huangxiaoming,85,75,85,99,66,88,75,9161 */62 @Override63 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {64 65 String[] split = value.toString().split(",");66 67 String course = split[0];68 String name = split[1];69 70 int sum = 0;71 int count = 0;72 73 for(int i = 2; i {92 93 @Override94 protected void reduce(CourseScore key, Iterable values, Context context) throws IOException, InterruptedException {95 96 97 }98 }99 }
CSPartitioner.java
1 import org.apache.hadoop.io.NullWritable; 2 import org.apache.hadoop.mapreduce.Partitioner; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 public class CSPartitioner extends Partitioner{ 7 8 /** 9 * 10 */11 @Override12 public int getPartition(CourseScore key, NullWritable value, int numPartitions) {13 14 String course = key.getCourse();15 if(course.equals("math")){16 return 0;17 }else if(course.equals("english")){18 return 1;19 }else if(course.equals("computer")){20 return 2;21 }else{22 return 3;23 }24 25 26 }27 28 29 }
第三题
CourseScoreMR3.java
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import com.ghgj.mr.exercise.gc.CourseScoreGC; 16 import com.ghgj.mr.exercise.pojo.CourseScore; 17 18 public class CourseScoreMR3{ 19 20 private static final int TOPN = 3; 21 22 public static void main(String[] args) throws Exception { 23 24 Configuration conf = new Configuration(); 25 FileSystem fs = FileSystem.get(conf); 26 Job job = Job.getInstance(conf); 27 28 29 job.setJarByClass(CourseScoreMR3.class); 30 job.setMapperClass(CourseScoreMR2Mapper.class); 31 job.setReducerClass(CourseScoreMR2Reducer.class); 32 33 job.setMapOutputKeyClass(CourseScore.class); 34 job.setMapOutputValueClass(NullWritable.class); 35 job.setOutputKeyClass(CourseScore.class); 36 job.setOutputValueClass(NullWritable.class); 37 38 39 // job.setPartitionerClass(CSPartitioner.class); 40 // job.setNumReduceTasks(4); 41 42 43 // 指定分组规则 44 job.setGroupingComparatorClass(CourseScoreGC.class); 45 46 47 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 48 Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last"); 49 FileInputFormat.setInputPaths(job, inputPath); 50 if(fs.exists(outputPath)){ 51 fs.delete(outputPath, true); 52 } 53 FileOutputFormat.setOutputPath(job, outputPath); 54 55 56 boolean isDone = job.waitForCompletion(true); 57 System.exit(isDone ? 0 : 1); 58 } 59 60 public static class CourseScoreMR2Mapper extends Mapper{ 61 62 CourseScore cs = new CourseScore(); 63 64 /** 65 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 66 */ 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 String name = split[1]; 74 75 int sum = 0; 76 int count = 0; 77 78 for(int i = 2; i { 97 98 int count = 0; 99 100 /**101 * reducer阶段的reduce方法的调用参数:key相同的额一组key-vlaue102 * 103 * redcuer阶段,每次遇到一个不同的key的key_value组, 那么reduce方法就会被调用一次。104 * 105 * 106 * values这个迭代器只能迭代一次。107 * values迭代器在迭代的过程中迭代出来的value会变,同时,这个value所对应的key也会跟着变 合理108 * 109 */110 @Override111 protected void reduce(CourseScore key, Iterable values, Context context) throws IOException, InterruptedException {112 113 114 int count = 0;115 116 for(NullWritable nvl : values){117 System.out.println("*********************************** " + (++count) + " " + key.toString());118 119 if(count == 3){120 return;121 }122 }123 124 125 // 原样输出126 /*for(NullWritable nvl : values){127 context.write(key, nvl);128 }*/129 130 131 // 输出每门课程的最高分数 , 预期结果中,key的显示都是一样的132 // for(NullWritable nvl : values){133 // System.out.println(key + " - " nvl);134 // 135 // valueList.add(nvl);136 // }137 138 // List valueList = null;139 // 预期结果中,key的显示都是一样的140 /*int count = 0;141 for(NullWritable nvl : values){142 count++;143 }144 for(int i = 0; i
CourseScoreGC.java
1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 /** 7 * 分组规则的指定 8 */ 9 public class CourseScoreGC extends WritableComparator{10 11 public CourseScoreGC(){12 super(CourseScore.class, true);13 }14 15 /**16 * 17 * 方法的定义解释:18 * 19 * 方法的意义:一般来说,都可以从方法名找到一些提示20 * 方法的参数:将来你的MR程序中,要作为key的两个对象,是否是相同的对象21 * 方法的返回值: 返回值类型为int 当返回值为0的时候。证明, 两个参数对象,经过比较之后,是同一个对象22 * 23 * 在我们的需求中: 分组规则是 Course24 * 25 */26 @Override27 public int compare(WritableComparable a, WritableComparable b) {28 29 CourseScore cs1 = (CourseScore)a;30 CourseScore cs2 = (CourseScore)b;31 32 int compareTo = cs1.getCourse().compareTo(cs2.getCourse());33 34 return compareTo;35 }36 }