4.10 join
将两个文件基于某一列数据连接合并为一行数据。
数据文件
artist.txt
00001,Moon Light,1973
00002,Scarborough Fair,1970
00003,Yesenia,1980
00004,Don't Cry For Me Argentina,1975
user_artist.txt
00001,201501,50
00002,201501,100
00003,201502,70
00004,201502,28
00005,201502,29
4.10.1 Map合并
借助于CompositeInputFormat,在Map之前合并必须满足条件如下:
两个数据集都是大的数据集,不能用缓存文件的方式。
数据集都是按照相同的键进行排序;
数据集有相同的分区数,同一个键的所有记录在同一个分区中,输出文件不可分割;
连接数据之前,需要对数据进行排序并设置成格式不压缩
package com.briup.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class FirstStage extends Configured implements Tool {
static class FirstStageMapper extends Mapper<LongWritable, Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs=value.toString().split(",");
context.write(new Text(strs[0]),value);
}
}
static class FirstStageReduce extends Reducer<Text,Text, NullWritable,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text val:values){
context.write(NullWritable.get(),val);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String input=conf.get("input");
String output=conf.get("output");
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("zipmap");
job.setMapperClass(FirstStageMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(FirstStageReduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job,new Path(input));
TextOutputFormat.setOutputPath(job,
new Path(output));
TextOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(
new ToolRunner().run(
new FirstStage(),args));
}
}
2.程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.FirstStage -D input=/user/hdfs/artist.txt -D output=/user/hdfs/one
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.FirstStage -D input=/user/hdfs/user_artist.txt -D output=/user/hdfs/two
Map合并两个文件代码实现:
package com.briup.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;
import org.apache.hadoop.mapreduce.lib.join.TupleWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/*
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.MergeMap -D one=/user/hdfs/one -D two=/user/hdfs/two -D output=/user/hdfs/merge
*/
public class MergeMap extends Configured implements Tool {
static class MergeMapMapper extends Mapper<Text, TupleWritable,Text,Text>{
@Override
protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();
for(Writable val:value){
sb.append(val.toString()).append(",");
}
sb.setLength(sb.length()-1);
context.write(key,new Text(sb.toString()));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String one=conf.get("one");
String two=conf.get("two");
String output=conf.get("output");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
String exp=CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.class,one,two);
conf.set("mapreduce.join.expr",exp);
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("mergemap");
job.setMapperClass(MergeMapMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(CompositeInputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(output));
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new MergeMap(),args));
}
}
程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.MergeMap -D one=/user/hdfs/one -D two=/user/hdfs/two -D output=/user/hdfs/merge
查询结果
4.10.2 Reduce合并
将两个文件基于某一列数据连接合并为一行数据。
数据文件
artist.txt
00001,Moon Light,1973
00002,Scarborough Fair,1970
00003,Yesenia,1980
00004,Don't Cry For Me Argentina,1975
user_artist.txt
00001,201501,50
00002,201501,100
00003,201502,70
00004,201502,28
00005,201502,29
构建复合键
package com.briup.MR.Merge.reduce;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TextTuple implements WritableComparable<TextTuple> {
private String uid;
//flag ->0表示artis.txt 为1 user_artis.txt
private int flag;
@Override
public String toString() {
return "TextTuple{" +
"uid='" + uid + '\'' +
", flag=" + flag +
'}';
}
public TextTuple() {
}
public TextTuple(String uid, int flag) {
this.uid = uid;
this.flag = flag;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(uid);
out.writeInt(flag);
}
public void readFields(DataInput in) throws IOException {
this.uid=in.readUTF();
this.flag=in.readInt();
}
public int compareTo(TextTuple o) {
//this 00001 o 00002
int n=this.getUid().compareTo(o.getUid());
if(n!=0){
return n;
}
return this.getFlag()-o.getFlag();
}
}
2.基于复合键中编号分区,保证键相同的进入同一个reduce
package com.briup.MR.Merge.reduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MergeReducePartitioner
extends Partitioner<TextTuple, Text> {
public int getPartition(TextTuple textTuple, Text text, int numPartitions) {
return Math.abs(textTuple.getUid().hashCode())
*127%numPartitions;
}
}
基于复合键中编号分组,保证键相同的划分到同一个组
package com.briup.MR.Merge.reduce;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MergeReduceGroup
extends WritableComparator {
public MergeReduceGroup(){
super(TextTuple.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextTuple t= (TextTuple) a;
TextTuple t1= (TextTuple) b;
return t.getUid().compareTo(t1.getUid());
}
}
4.MapReduce代码实现
package com.briup.MR.Merge.reduce;
import com.briup.MR.Merge.map.MapMerge;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
/*
artist.txt TextInputFormat
user_artist.txt KeyValueTextInputFormat
*/
public class MergeReduceMR
extends Configured implements Tool {
static class ArtistMapper
extends Mapper<LongWritable, Text,TextTuple,Text>{
StringBuffer sb=new StringBuffer();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] str=value.toString().split(",");
for(int i=1;i<str.length;i++){
sb.append(str[i]).append(",");
}
sb.setLength(sb.length()-1);
TextTuple k=new TextTuple(str[0],0);
context.write(k,new Text(sb.toString()));
sb.setLength(0);
}
}
/*
ArtistMapper->
key value
TextTuple(00001,0) Moon Light,1973
TextTuple(00002,0) Scarborough Fair,1970
*/
static class UserArtistMapper
extends Mapper<Text,Text,TextTuple,Text>{
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
TextTuple k=
new TextTuple(key.toString(),1);
context.write(k,value);
}
}
/*
UserArtistMapper->
key value
TextTuple(00001,1) 201501,50
TextTuple(00002,1) 201501,100
*/
static class MergeReduceMRReduce
extends Reducer<TextTuple,Text,Text,Text>{
@Override
protected void reduce(TextTuple key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator iter=values.iterator();
String artist=iter.next().toString();
while(iter.hasNext()){
String user_artist=iter.next().toString();
context.write(new Text(key.getUid()),
new Text(artist+"#"+user_artist));
}
}
}
/*
reduce->
key value
00001 Moon Light,1973#201501,50
*/
public int run(String[] args) throws Exception {
Configuration conf=getConf();
String one=conf.get("one");
String two=conf.get("two");
String output=conf.get("output");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Job job=Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("reduceMerge");
//构建多个不同代码的map
MultipleInputs.addInputPath(job,
new Path(one),
TextInputFormat.class,
ArtistMapper.class);
MultipleInputs.addInputPath(job,
new Path(two),
KeyValueTextInputFormat.class,
UserArtistMapper.class);
job.setMapOutputKeyClass(TextTuple.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(
MergeReducePartitioner.class);
job.setGroupingComparatorClass(
MergeReduceGroup.class);
job.setNumReduceTasks(2);
job.setReducerClass(MergeReduceMRReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job,new Path(output));
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(
new ToolRunner().run(
new MergeReduceMR(),args));
}
}
程序打包执行
yarn jar untitled-1.0-SNAPSHOT.jar com.briup.MR.Merge.reduce.MergeReduceMR -D one=/user/hdfs/artist.txt -D two=/user/hdfs/user_artist.txt -D output=/user/hdfs/reduce_result
查询结果