4.9 分区排序
Partitioner:对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一 Reducer处理
4.9.1 数据分区
分区采用HashPartitioner,默认分区按照键的hashcode值除以reduce个数,取余,余数是几就进入第几个分区,默认reduce从0开始。
将138、189、188开头的收集号码整理出来
原始数据
phone.txt
13862465004,tom
18923423212,jake
13832321324,zhansan
18988888888,lili
18832231321,briup
13809832333,kv
18832321333,join
将文件上传到hdfs分布式文件系统中
hdfs dfs -put phone.txt /user/hdfs
分区规则代码实现
package com.briup.MR;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PhonePartitioner extends Partitioner<Text,Text> {
@Override
public int getPartition(Text text, Text text2, int numPartitions) {
String phone=text.toString();
if(phone.startsWith("138")){
return 0;
}else if(phone.startsWith("188")){
return 1;
}
return 2;
}
}
作业代码实现
package com.briup.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PartitionerMr extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String input=conf.get("input");
String output=conf.get("output");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("order");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(PhonePartitioner.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job,new Path(input));
TextOutputFormat.setOutputPath(job,new Path(output));
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new PartitionerMr(),args));
}
}
程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.PartitionerMr -D input=/user/hdfs/phone.txt -D output=/user/hdfs/phone_result
查看结果
hdfs dfs -cat phone_result/part-r-00000
hdfs dfs -cat phone_result/part-r-00001
hdfs dfs -cat phone_result/part-r-00002
4.9.2 局部排序
分区采用HashPartitioner,每一个reduce输出一个结果文件,文件与文件之间内容不存在排序,文件内部排序
基于年份降序排序
原始数据
wea.txt
1992,23
1993,22
1992,34
1993,26
1994,25
1995,21
1996,25
1997,31
1996,30
1995,33
1994,27
1993,29
1992,33
1993,28
1999,35
1998,34
1997,29
1996,35
1995,27
1994,36
将文件上传到hdfs分布式文件系统中
hdfs dfs -put wea.txt /user/hdfs
排序代码实现
package com.briup.MR;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class Sort extends WritableComparator {
public Sort(){
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Text y= (Text) a;
Text y1= (Text) b;
return -y.compareTo(y1);
}
}
4.作业代码实现
package com.briup.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*/
public class OrderMR
extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String input=conf.get("input");
String output=conf.get("output");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("order");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(Sort.class);
job.setNumReduceTasks(2);
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job,new Path(input));
TextOutputFormat.setOutputPath(job,new Path(output));
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(
new ToolRunner().run(
new OrderMR(),args));
}
}
5.程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.OrderMR -D input=/user/hdfs/wea.txt -D output=/user/hdfs/wea_result
查看结果
hdfs dfs -cat /user/hdfs/wea_result/part-r-00000
hdfs dfs -cat /user/hdfs/wea_result/part-r-00001
4.9.3 二次排序
基于年份降序排序,温度升序排序
原始数据
wea.txt
1992,23
1993,22
1992,34
1993,26
1994,25
1995,21
1996,25
1997,31
1996,30
1995,33
1994,27
1993,29
1992,33
1993,28
1999,35
1998,34
1997,29
1996,35
1995,27
1994,36
将文件上传到hdfs分布式文件系统中
hdfs dfs -put wea.txt /user/hdfs
构建包含所有排序字段的实体类
package com.briup.MR;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class YearTmp implements WritableComparable<YearTmp> {
private String year;
private double temp;
@Override
public String toString() {
return "YearTmp{" +
"year='" + year + '\'' +
", temp=" + temp +
'}';
}
public YearTmp() {
}
public YearTmp(String year, double temp) {
this.year = year;
this.temp = temp;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public double getTemp() {
return temp;
}
public void setTemp(double temp) {
this.temp = temp;
}
@Override
public int compareTo(YearTmp o) {
int n=this.year.compareTo(o.getYear());
if(n==0){
return (int) (this.getTemp()-o.getTemp());
}
return -n;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(year);
out.writeDouble(temp);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readUTF();
this.temp=in.readDouble();
}
}
分区,年份相同的进入同一个分区
package com.briup.MR;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondPartitioner extends Partitioner<YearTmp, DoubleWritable> {
@Override
public int getPartition(YearTmp yearTmp, DoubleWritable doubleWritable, int numPartitions) {
return (yearTmp.getYear().hashCode())*127%numPartitions;
}
}
年份和温度相同的归为一个组
package com.briup.MR;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class YearTmpGroup extends WritableComparator {
public YearTmpGroup(){
super(YearTmp.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
YearTmp y= (YearTmp) a;
YearTmp y1= (YearTmp) b;
return (int) ((y.getYear().compareTo(y1.getYear())));
}
}
作业代码实现
package com.briup.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.net.URI;
public class SecondSort extends Configured implements Tool {
static class SecondSortMapper extends Mapper<LongWritable, Text,YearTmp, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs=value.toString().split(",");
context.write(new YearTmp(strs[0],Double.parseDouble(strs[1])),
new DoubleWritable(Double.parseDouble(strs[1])));
}
}
static class SecondSortReduce extends Reducer<YearTmp,DoubleWritable,Text,DoubleWritable> {
@Override
protected void reduce(YearTmp key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
for (DoubleWritable val:values){
context.write(new Text(key.getYear()),val);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String input=conf.get("input");
String output=conf.get("output");
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("secondSort");
job.setMapperClass(SecondSortMapper.class);
job.setMapOutputKeyClass(YearTmp.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(SecondPartitioner.class);
job.setGroupingComparatorClass(YearTmpGroup.class);
job.setNumReduceTasks(2);
job.setReducerClass(SecondSortReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
TextInputFormat.addInputPath(job,new Path(input));
TextOutputFormat.setOutputPath(job,
new Path(output));
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) {
try {
System.exit(
new ToolRunner().run(
new SecondSort(),args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.程序打包执行yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.SecondSort -D input=/user/hdfs/wea.txt -D output=/user/hdfs/wea_result
查看结果
hdfs dfs -cat /user/hdfs/total_result/part-r-00000
hdfs dfs -cat /user/hdfs/total_result/part-r-00001