Partitioner:对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一 Reducer处理


4.9.1 数据分区

分区采用HashPartitioner,默认分区按照键的hashcode值除以reduce个数,取余,余数是几就进入第几个分区,默认reduce从0开始。

将138、189、188开头的收集号码整理出来

  1. 原始数据

    phone.txt

13862465004,tom

18923423212,jake

13832321324,zhansan

18988888888,lili

18832231321,briup

13809832333,kv

18832321333,join

将文件上传到hdfs分布式文件系统中

hdfs dfs -put phone.txt /user/hdfs

分区规则代码实现

package com.briup.MR;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;


public class PhonePartitioner extends Partitioner<Text,Text> {

    @Override

    public int getPartition(Text text, Text text2, int numPartitions) {

        String phone=text.toString();

        if(phone.startsWith("138")){

            return 0;

        }else if(phone.startsWith("188")){

            return 1;

        }

        return 2;

    }

}

  1. 作业代码实现

package com.briup.MR;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class PartitionerMr extends Configured implements Tool {

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String input=conf.get("input");

        String output=conf.get("output");

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("order");


        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);


        job.setPartitionerClass(PhonePartitioner.class);


        job.setNumReduceTasks(3);


        job.setInputFormatClass(KeyValueTextInputFormat.class);

        KeyValueTextInputFormat.addInputPath(job,new Path(input));

        TextOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true)?0:-1;

    }


    public static void main(String[] args) throws Exception {

        System.exit(new ToolRunner().run(new PartitionerMr(),args));

    }

}

  1. 程序打包执行

 yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.PartitionerMr -D input=/user/hdfs/phone.txt -D output=/user/hdfs/phone_result
  1. 查看结果

hdfs dfs -cat phone_result/part-r-00000

hdfs dfs -cat phone_result/part-r-00001

hdfs dfs -cat phone_result/part-r-00002

4.9.2 局部排序

分区采用HashPartitioner,每一个reduce输出一个结果文件,文件与文件之间内容不存在排序,文件内部排序

基于年份降序排序

  1. 原始数据

    wea.txt

1992,23

1993,22

1992,34

1993,26

1994,25

1995,21

1996,25

1997,31

1996,30

1995,33

1994,27

1993,29

1992,33

1993,28

1999,35

1998,34

1997,29

1996,35

1995,27

1994,36

  1. 将文件上传到hdfs分布式文件系统中

hdfs dfs -put wea.txt /user/hdfs
  1. 排序代码实现

package com.briup.MR;


import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;


public class Sort extends WritableComparator {

    public Sort(){

        super(Text.class, true);

    }


    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        Text y= (Text) a;

        Text y1= (Text) b;

        return -y.compareTo(y1);

    }

}

4.作业代码实现

package com.briup.MR;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


/**


 */

public class OrderMR

        extends Configured implements Tool {

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String input=conf.get("input");

        String output=conf.get("output");

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("order");


        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);


        job.setSortComparatorClass(Sort.class);


        job.setNumReduceTasks(2);


        job.setInputFormatClass(KeyValueTextInputFormat.class);

        KeyValueTextInputFormat.addInputPath(job,new Path(input));

        TextOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true)?0:-1;

    }


    public static void main(String[] args) throws Exception {

        System.exit(

                new ToolRunner().run(

                        new OrderMR(),args));

    }

}

5.程序打包执行

yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.OrderMR -D input=/user/hdfs/wea.txt -D output=/user/hdfs/wea_result

  1. 查看结果

hdfs dfs -cat /user/hdfs/wea_result/part-r-00000
hdfs dfs -cat /user/hdfs/wea_result/part-r-00001

4.9.3 二次排序

基于年份降序排序,温度升序排序

  1. 原始数据

    wea.txt

1992,23

1993,22

1992,34

1993,26

1994,25

1995,21

1996,25

1997,31

1996,30

1995,33

1994,27

1993,29

1992,33

1993,28

1999,35

1998,34

1997,29

1996,35

1995,27

1994,36

  1. 将文件上传到hdfs分布式文件系统中

hdfs dfs -put wea.txt /user/hdfs
  1. 构建包含所有排序字段的实体类

package com.briup.MR;


import org.apache.hadoop.io.WritableComparable;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


public class YearTmp implements WritableComparable<YearTmp> {

    private String year;

    private double temp;


    @Override

    public String toString() {

        return "YearTmp{" +

                "year='" + year + '\'' +

                ", temp=" + temp +

                '}';

    }


    public YearTmp() {

    }


    public YearTmp(String year, double temp) {

        this.year = year;

        this.temp = temp;

    }


    public String getYear() {

        return year;

    }


    public void setYear(String year) {

        this.year = year;

    }


    public double getTemp() {

        return temp;

    }


    public void setTemp(double temp) {

        this.temp = temp;

    }


    @Override

    public int compareTo(YearTmp o) {

        int n=this.year.compareTo(o.getYear());

        if(n==0){

            return (int) (this.getTemp()-o.getTemp());

        }

        return -n;

    }


    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(year);

        out.writeDouble(temp);

    }


    @Override

    public void readFields(DataInput in) throws IOException {

        this.year=in.readUTF();

        this.temp=in.readDouble();

    }

}

  1. 分区,年份相同的进入同一个分区

package com.briup.MR;


import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.mapreduce.Partitioner;


public class SecondPartitioner extends Partitioner<YearTmp, DoubleWritable> {

    @Override

    public int getPartition(YearTmp yearTmp, DoubleWritable doubleWritable, int numPartitions) {

        return (yearTmp.getYear().hashCode())*127%numPartitions;

    }

}

  1. 年份和温度相同的归为一个组

package com.briup.MR;


import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;


public class YearTmpGroup extends WritableComparator {

    public YearTmpGroup(){

        super(YearTmp.class,true);

    }


    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        YearTmp y= (YearTmp) a;

        YearTmp y1= (YearTmp) b;

        return (int) ((y.getYear().compareTo(y1.getYear())));

    }

}

  1. 作业代码实现

package com.briup.MR;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.mapreduce.lib.partition.InputSampler;

import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;

import java.net.URI;


public class SecondSort extends Configured implements Tool {

    static class SecondSortMapper extends Mapper<LongWritable, Text,YearTmp, DoubleWritable> {

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] strs=value.toString().split(",");

            context.write(new YearTmp(strs[0],Double.parseDouble(strs[1])),

                    new DoubleWritable(Double.parseDouble(strs[1])));

        }

    }

    static class SecondSortReduce extends Reducer<YearTmp,DoubleWritable,Text,DoubleWritable> {

        @Override

        protected void reduce(YearTmp key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

            for (DoubleWritable val:values){

                context.write(new Text(key.getYear()),val);

            }

        }

    }


    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String input=conf.get("input");

        String output=conf.get("output");


        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("secondSort");


        job.setMapperClass(SecondSortMapper.class);

        job.setMapOutputKeyClass(YearTmp.class);

        job.setMapOutputValueClass(DoubleWritable.class);


        job.setPartitionerClass(SecondPartitioner.class);

        job.setGroupingComparatorClass(YearTmpGroup.class);

        job.setNumReduceTasks(2);


        job.setReducerClass(SecondSortReduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(DoubleWritable.class);


        TextInputFormat.addInputPath(job,new Path(input));

        TextOutputFormat.setOutputPath(job,

                new Path(output));

        return job.waitForCompletion(true)?0:-1;

    }


    public static void main(String[] args) {

        try {

            System.exit(

                    new ToolRunner().run(

                            new SecondSort(),args));

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

7.程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar  com.briup.MR.SecondSort -D input=/user/hdfs/wea.txt -D output=/user/hdfs/wea_result
  1. 查看结果

hdfs dfs -cat /user/hdfs/total_result/part-r-00000
hdfs dfs -cat /user/hdfs/total_result/part-r-00001


最后修改: 2023年12月28日 星期四 16:04