4.11 Combiner
前提:每一个map都可能会产生大量的本地输出
Combiner功能:对map端的输出先做一次合并
目的:减少在map和reduce节点之间的数据传输量,以提高网络性能
案例1:计算每一年的平均气温。
原始数据
wea.txt
1992,23
1993,22
1992,34
1993,26
1994,25
1995,21
1996,25
1997,31
1996,30
1995,33
1994,27
1993,29
1992,33
1993,28
1999,35
1998,34
1997,29
1996,35
1995,27
1994,36
将文件上传到hdfs分布式文件系统中
hdfs dfs -put wea.txt /user/hdfs
定义复合键存储温度的值和个数
package com.briup.MR;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AvgSumTemp implements Writable {
private int num;
private Double sum_temp;
public AvgSumTemp() {
}
public AvgSumTemp(int num, Double sum_temp) {
this.num = num;
this.sum_temp = sum_temp;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Double getSum_temp() {
return sum_temp;
}
public void setSum_temp(Double sum_temp) {
this.sum_temp = sum_temp;
}
@Override
public String toString() {
return "AvgSumTemp{" +
"num=" + num +
", sum_temp=" + sum_temp +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(num);
out.writeDouble(sum_temp);
}
@Override
public void readFields(DataInput in) throws IOException {
this.num=in.readInt();
this.sum_temp=in.readDouble();
}
}
代码实现
package com.briup.MR;
import com.briup.MR.Avg.AverageTemperatureByYear;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class AvgYearTempMR extends Configured implements Tool {
static class AvgYearMRTempMapper
extends Mapper<LongWritable, Text,Text,AvgSumTemp>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] val=value.toString().split(",");
AvgSumTemp as=new AvgSumTemp(1,Double.parseDouble(val[1]));
context.write(new Text(val[0]),as);
}
}
static class AvgYearTempMRCombiner
extends Reducer<Text,AvgSumTemp,Text,AvgSumTemp>{
@Override
protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {
int nums=0;
double sum_temp=0;
for(AvgSumTemp as:values){
nums+=as.getNum();
sum_temp+=as.getSum_temp();
}
context.write(key,new AvgSumTemp(nums,sum_temp));
}
}
static class AvgYearMRTempReduce
extends Reducer<Text,AvgSumTemp,Text, DoubleWritable>{
@Override
protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {
double sum_temp=0;
int nums=0;
for(AvgSumTemp as:values){
nums+=as.getNum();
sum_temp+=as.getSum_temp();
}
context.write(key,new DoubleWritable(sum_temp/nums));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(AverageTemperatureByYear.class);
job.setJobName("Average Temperature By Year");
job.setMapperClass(AvgYearMRTempMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AvgSumTemp.class);
job.setCombinerClass(AvgYearTempMRCombiner.class);
job.setReducerClass(AvgYearMRTempReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job, new Path(conf.get("input")));
TextOutputFormat.setOutputPath(job, new Path(conf.get("output")));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
System.exit(
new ToolRunner().run(
new AvgYearTempMR(),args));
}
}
程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.AvgYearTempMR -D input=/user/hdfs/wea.txt -D output=/user/hdfs/avg_result
查询结果