将两个文件基于某一列数据连接合并为一行数据。

数据文件

artist.txt

00001,Moon Light,1973
00002,Scarborough Fair,1970
00003,Yesenia,1980
00004,Don't Cry For Me Argentina,1975

user_artist.txt

00001,201501,50 

00002,201501,100

00003,201502,70

00004,201502,28

00005,201502,29

4.10.1 Map合并

借助于CompositeInputFormat,在Map之前合并必须满足条件如下:

  • 两个数据集都是大的数据集,不能用缓存文件的方式。

  • 数据集都是按照相同的键进行排序;

  • 数据集有相同的分区数,同一个键的所有记录在同一个分区中,输出文件不可分割;

  1. 连接数据之前,需要对数据进行排序并设置成格式不压缩

package com.briup.MR;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;


public class FirstStage extends Configured implements Tool {

    static class FirstStageMapper extends Mapper<LongWritable, Text,Text,Text>{

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] strs=value.toString().split(",");

            context.write(new Text(strs[0]),value);

        }

    }

    static class FirstStageReduce extends Reducer<Text,Text, NullWritable,Text>{

        @Override

        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            for(Text val:values){

                context.write(NullWritable.get(),val);

            }

        }

    }

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String input=conf.get("input");

        String output=conf.get("output");


        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("zipmap");


        job.setMapperClass(FirstStageMapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);



        job.setReducerClass(FirstStageReduce.class);

        job.setOutputKeyClass(NullWritable.class);

        job.setOutputValueClass(Text.class);


        TextInputFormat.addInputPath(job,new Path(input));

        TextOutputFormat.setOutputPath(job,

                new Path(output));

        TextOutputFormat.setOutputCompressorClass(job,GzipCodec.class);

        return job.waitForCompletion(true)?0:-1;


    }


    public static void main(String[] args) throws Exception {

        System.exit(

                new ToolRunner().run(

                        new FirstStage(),args));

    }

}

2.程序打包执行

yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.FirstStage -D input=/user/hdfs/artist.txt -D output=/user/hdfs/one

 yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.FirstStage -D input=/user/hdfs/user_artist.txt -D output=/user/hdfs/two

  1. Map合并两个文件代码实现:

package com.briup.MR;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;

import org.apache.hadoop.mapreduce.lib.join.TupleWritable;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;

/*

yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.MergeMap -D one=/user/hdfs/one  -D two=/user/hdfs/two -D output=/user/hdfs/merge

 */

public class MergeMap extends Configured implements Tool {

    static class MergeMapMapper extends Mapper<Text, TupleWritable,Text,Text>{

        @Override

        protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {

            StringBuffer sb=new StringBuffer();

            for(Writable val:value){

                sb.append(val.toString()).append(",");

            }

            sb.setLength(sb.length()-1);

            context.write(key,new Text(sb.toString()));

        }

    }

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String one=conf.get("one");

        String two=conf.get("two");

        String output=conf.get("output");

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");


        String exp=CompositeInputFormat.compose("inner",

                KeyValueTextInputFormat.class,one,two);

        conf.set("mapreduce.join.expr",exp);

        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("mergemap");

        

        job.setMapperClass(MergeMapMapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        job.setNumReduceTasks(0);


        job.setInputFormatClass(CompositeInputFormat.class);

        TextOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true)?0:-1;

    }


    public static void main(String[] args) throws Exception {

        System.exit(new ToolRunner().run(new MergeMap(),args));

    }

}

  1. 程序打包执行

yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.MergeMap -D one=/user/hdfs/one  -D two=/user/hdfs/two -D output=/user/hdfs/merge
  1. 查询结果

hdfs dfs -cat merge/part-m-00000

4.10.2 Reduce合并

将两个文件基于某一列数据连接合并为一行数据。

数据文件

artist.txt

00001,Moon Light,1973
00002,Scarborough Fair,1970
00003,Yesenia,1980
00004,Don't Cry For Me Argentina,1975

user_artist.txt

00001,201501,50 

00002,201501,100

00003,201502,70

00004,201502,28

00005,201502,29

  1. 构建复合键

package com.briup.MR.Merge.reduce;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


public class TextTuple implements WritableComparable<TextTuple> {

    private String uid;

    //flag ->0表示artis.txt 为1 user_artis.txt

    private int flag;


    @Override

    public String toString() {

        return "TextTuple{" +

                "uid='" + uid + '\'' +

                ", flag=" + flag +

                '}';

    }


    public TextTuple() {

    }


    public TextTuple(String uid, int flag) {

        this.uid = uid;

        this.flag = flag;

    }


    public String getUid() {

        return uid;

    }


    public void setUid(String uid) {

        this.uid = uid;

    }


    public int getFlag() {

        return flag;

    }


    public void setFlag(int flag) {

        this.flag = flag;

    }


    public void write(DataOutput out) throws IOException {

        out.writeUTF(uid);

        out.writeInt(flag);

    }


    public void readFields(DataInput in) throws IOException {

        this.uid=in.readUTF();

        this.flag=in.readInt();

    }


    public int compareTo(TextTuple o) {

        //this 00001        o 00002

        int n=this.getUid().compareTo(o.getUid());

        if(n!=0){

            return n;

        }

        return this.getFlag()-o.getFlag();

    }

}

2.基于复合键中编号分区,保证键相同的进入同一个reduce

package com.briup.MR.Merge.reduce;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;


public class MergeReducePartitioner

        extends Partitioner<TextTuple, Text> {

    public int getPartition(TextTuple textTuple, Text text, int numPartitions) {

        return Math.abs(textTuple.getUid().hashCode())

                *127%numPartitions;

    }

}

  1. 基于复合键中编号分组,保证键相同的划分到同一个组

package com.briup.MR.Merge.reduce;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;


public class MergeReduceGroup

        extends WritableComparator {

    public MergeReduceGroup(){

        super(TextTuple.class,true);

    }


    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        TextTuple t= (TextTuple) a;

        TextTuple t1= (TextTuple) b;

        return t.getUid().compareTo(t1.getUid());

    }

}

4.MapReduce代码实现

package com.briup.MR.Merge.reduce;



import com.briup.MR.Merge.map.MapMerge;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;

import java.util.Iterator;


/*

artist.txt TextInputFormat

user_artist.txt KeyValueTextInputFormat

 */

public class MergeReduceMR

        extends Configured implements Tool {

    static class ArtistMapper

            extends Mapper<LongWritable, Text,TextTuple,Text>{

        StringBuffer sb=new StringBuffer();

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] str=value.toString().split(",");

            for(int i=1;i<str.length;i++){

                sb.append(str[i]).append(",");

            }

            sb.setLength(sb.length()-1);

            TextTuple k=new TextTuple(str[0],0);

            context.write(k,new Text(sb.toString()));

            sb.setLength(0);

        }

    }

    /*

    ArtistMapper->

    key                     value

    TextTuple(00001,0)      Moon Light,1973

    TextTuple(00002,0)      Scarborough Fair,1970

     */

    static class UserArtistMapper

            extends Mapper<Text,Text,TextTuple,Text>{

        @Override

        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

            TextTuple k=

                    new TextTuple(key.toString(),1);

            context.write(k,value);

        }

    }

    /*

    UserArtistMapper->

    key                     value

    TextTuple(00001,1)      201501,50

    TextTuple(00002,1)      201501,100

     */

    static class MergeReduceMRReduce

            extends Reducer<TextTuple,Text,Text,Text>{

        @Override

        protected void reduce(TextTuple key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            Iterator iter=values.iterator();

            String artist=iter.next().toString();

            while(iter.hasNext()){

                String user_artist=iter.next().toString();

                context.write(new Text(key.getUid()),

                        new Text(artist+"#"+user_artist));

            }

        }

    }

    /*

    reduce->

    key         value

    00001       Moon Light,1973#201501,50

     */

    public int run(String[] args) throws Exception {

        Configuration conf=getConf();

        String one=conf.get("one");

        String two=conf.get("two");

        String output=conf.get("output");

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");


        Job job=Job.getInstance(conf);

        job.setJarByClass(this.getClass());

        job.setJobName("reduceMerge");


        //构建多个不同代码的map

        MultipleInputs.addInputPath(job,

                new Path(one),

                TextInputFormat.class,

                ArtistMapper.class);

        MultipleInputs.addInputPath(job,

                new Path(two),

                KeyValueTextInputFormat.class,

                UserArtistMapper.class);


        job.setMapOutputKeyClass(TextTuple.class);

        job.setMapOutputValueClass(Text.class);


        job.setPartitionerClass(

                MergeReducePartitioner.class);

        job.setGroupingComparatorClass(

                MergeReduceGroup.class);



        job.setNumReduceTasks(2);


        job.setReducerClass(MergeReduceMRReduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);


        TextOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true)?0:-1;

    }


    public static void main(String[] args) throws Exception {

        System.exit(

                new ToolRunner().run(

                        new MergeReduceMR(),args));

    }

}

  1. 程序打包执行

yarn jar untitled-1.0-SNAPSHOT.jar com.briup.MR.Merge.reduce.MergeReduceMR -D one=/user/hdfs/artist.txt -D two=/user/hdfs/user_artist.txt -D output=/user/hdfs/reduce_result
  1. 查询结果

hdfs dfs -cat /user/hdfs/reduce_result/part-r-00000


最后修改: 2023年12月28日 星期四 16:14