4.12 作业流
JobControl:方便用户编写有依赖关系的作业,这些作业往往构成一个有向图。便于把多个MapReduce程序连在一起,基于我们希望的执行顺序执行作业。
原始数据
wea.txt
1992,23
1993,22
1992,34
1993,26
1994,25
1995,21
1996,25
1997,31
1996,30
1995,33
1994,27
1993,29
1992,33
1993,28
1999,35
1998,34
1997,29
1996,35
1995,27
1994,36
将文件上传到hdfs分布式文件系统中
hdfs dfs -put wea.txt /user/hdfs
定义复合键存储温度的值和个数
代码实现:
package com.briup.MR;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AvgSumTemp implements Writable {
private int num;
private Double sum_temp;
public AvgSumTemp() {
}
public AvgSumTemp(int num, Double sum_temp) {
this.num = num;
this.sum_temp = sum_temp;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Double getSum_temp() {
return sum_temp;
}
public void setSum_temp(Double sum_temp) {
this.sum_temp = sum_temp;
}
@Override
public String toString() {
return "AvgSumTemp{" +
"num=" + num +
", sum_temp=" + sum_temp +
'}';
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(num);
out.writeDouble(sum_temp);
}
@Override
public void readFields(DataInput in) throws IOException {
this.num=in.readInt();
this.sum_temp=in.readDouble();
}
}
第一个作业:计算每年的温度总和数量
package com.briup.MR;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
public class SumTemp{
static class SumTempMapper
extends Mapper<LongWritable, Text,Text,AvgSumTemp> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] val=value.toString().split(",");
AvgSumTemp as=new AvgSumTemp(1,Double.parseDouble(val[1]));
context.write(new Text(val[0]),as);
}
}
static class SumTempReduce
extends Reducer<Text,AvgSumTemp,Text,AvgSumTemp> {
@Override
protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {
int nums=0;
double sum_temp=0;
for(AvgSumTemp as:values){
nums+=as.getNum();
sum_temp+=as.getSum_temp();
}
context.write(key,new AvgSumTemp(nums,sum_temp));
}
}
}
第二个作业:计算每年的平均气温
package com.briup.MR;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class AvgTemp {
static class AvgTempMapper extends Mapper<Text,AvgSumTemp,Text, DoubleWritable>{
@Override
protected void map(Text key, AvgSumTemp value, Context context) throws IOException, InterruptedException {
context.write(key,new DoubleWritable(value.getSum_temp()/value.getNum()));
}
}
}
作业流代码实现
package com.briup.MR;
import com.briup.MR.Avg.AverageTemperatureByYear;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvgWorkFlow extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String input = conf.get("input");
String temp_input=conf.get("temp_input");
String output =conf.get("output");
Job job1 = Job.getInstance(conf);
job1.setJarByClass(this.getClass());
job1.setJobName("sum_temp");
job1.setMapperClass(SumTemp.SumTempMapper.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(AvgSumTemp.class);
job1.setReducerClass(SumTemp.SumTempReduce.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(AvgSumTemp.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
TextInputFormat.addInputPath(job1, new Path(input));
SequenceFileOutputFormat.setOutputPath(job1, new Path(temp_input));
Job job2 = Job.getInstance(conf);
job2.setJarByClass(this.getClass());
job2.setJobName("avg_temp");
job2.setMapperClass(AvgTemp.AvgTempMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(DoubleWritable.class);
job2.setNumReduceTasks(0);
job2.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job2, new Path(temp_input));
TextOutputFormat.setOutputPath(job2, new Path(output));
ControlledJob cron_tab=new ControlledJob(conf);
cron_tab.setJob(job1);
ControlledJob cron_tab1=new ControlledJob(conf);
cron_tab1.setJob(job2);
cron_tab1.addDependingJob(cron_tab);
JobControl jobs=new JobControl("work_flow");
jobs.addJob(cron_tab);
jobs.addJob(cron_tab1);
Thread t=new Thread(jobs);
t.start();
while(true){
for(ControlledJob c:jobs.getRunningJobList()){
c.getJob().monitorAndPrintJob();
}
if(jobs.allFinished())break;
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new AvgWorkFlow(),args));
}
}
程序打包执行
yarn jar GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.AvgWorkFlow -D input=/user/hdfs/wea.txt -D temp_input=/user/hdfs/temp_input -D output=/user/hdfs/avg_result
查询结果
hdfs dfs -cat /user/hdfs/avg_result/part-m-00000