• 前提:每一个map都可能会产生大量的本地输出

  • Combiner功能:对map端的输出先做一次合并

  • 目的:减少在map和reduce节点之间的数据传输量,以提高网络性能


案例1:计算每一年的平均气温。

  1. 原始数据

    wea.txt

1992,23

1993,22

1992,34

1993,26

1994,25

1995,21

1996,25

1997,31

1996,30

1995,33

1994,27

1993,29

1992,33

1993,28

1999,35

1998,34

1997,29

1996,35

1995,27

1994,36

  1. 将文件上传到hdfs分布式文件系统中

hdfs dfs -put wea.txt /user/hdfs
  1. 定义复合键存储温度的值和个数

package com.briup.MR;


import org.apache.hadoop.io.Writable;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


public class AvgSumTemp implements Writable {

    private int num;

    private Double sum_temp;


    public AvgSumTemp() {

    }


    public AvgSumTemp(int num, Double sum_temp) {

        this.num = num;

        this.sum_temp = sum_temp;

    }


    public int getNum() {

        return num;

    }


    public void setNum(int num) {

        this.num = num;

    }


    public Double getSum_temp() {

        return sum_temp;

    }


    public void setSum_temp(Double sum_temp) {

        this.sum_temp = sum_temp;

    }


    @Override

    public String toString() {

        return "AvgSumTemp{" +

                "num=" + num +

                ", sum_temp=" + sum_temp +

                '}';

    }


    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(num);

        out.writeDouble(sum_temp);

    }


    @Override

    public void readFields(DataInput in) throws IOException {

        this.num=in.readInt();

        this.sum_temp=in.readDouble();

    }

}

  1. 代码实现

package com.briup.MR;


import com.briup.MR.Avg.AverageTemperatureByYear;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


public class AvgYearTempMR extends Configured implements Tool {

    static class AvgYearMRTempMapper

            extends Mapper<LongWritable, Text,Text,AvgSumTemp>{

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] val=value.toString().split(",");

            AvgSumTemp as=new AvgSumTemp(1,Double.parseDouble(val[1]));

            context.write(new Text(val[0]),as);

        }

    }

    static class AvgYearTempMRCombiner

            extends Reducer<Text,AvgSumTemp,Text,AvgSumTemp>{

        @Override

        protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {

            int nums=0;

            double sum_temp=0;

            for(AvgSumTemp as:values){

                nums+=as.getNum();

                sum_temp+=as.getSum_temp();

            }

            context.write(key,new AvgSumTemp(nums,sum_temp));

        }

    }

    static class AvgYearMRTempReduce

            extends Reducer<Text,AvgSumTemp,Text, DoubleWritable>{

        @Override

        protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {

            double sum_temp=0;

            int nums=0;

            for(AvgSumTemp as:values){

               nums+=as.getNum();

               sum_temp+=as.getSum_temp();

            }

            context.write(key,new DoubleWritable(sum_temp/nums));

        }

    }


    @Override

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();

        Job job = Job.getInstance(conf);

        job.setJarByClass(AverageTemperatureByYear.class);

        job.setJobName("Average Temperature By Year");


        job.setMapperClass(AvgYearMRTempMapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(AvgSumTemp.class);


        job.setCombinerClass(AvgYearTempMRCombiner.class);


        job.setReducerClass(AvgYearMRTempReduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(DoubleWritable.class);



        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);


        TextInputFormat.addInputPath(job, new Path(conf.get("input")));

        TextOutputFormat.setOutputPath(job, new Path(conf.get("output")));


        return (job.waitForCompletion(true) ? 0 : 1);

    }


    public static void main(String[] args) throws Exception {

        System.exit(

                new ToolRunner().run(

                        new AvgYearTempMR(),args));

    }

}

  1. 程序打包执行

yarn jar  GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.AvgYearTempMR -D input=/user/hdfs/wea.txt -D output=/user/hdfs/avg_result
  1. 查询结果

hdfs dfs -cat /user/hdfs/avg_result/part-r-00000


最后修改: 2023年12月28日 星期四 16:31