JobControl:方便用户编写有依赖关系的作业,这些作业往往构成一个有向图。便于把多个MapReduce程序连在一起,基于我们希望的执行顺序执行作业。

  1. 原始数据

    wea.txt

1992,23

1993,22

1992,34

1993,26

1994,25

1995,21

1996,25

1997,31

1996,30

1995,33

1994,27

1993,29

1992,33

1993,28

1999,35

1998,34

1997,29

1996,35

1995,27

1994,36

  1. 将文件上传到hdfs分布式文件系统中

hdfs dfs -put wea.txt /user/hdfs

定义复合键存储温度的值和个数

  1. 代码实现:

package com.briup.MR;


import org.apache.hadoop.io.Writable;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


public class AvgSumTemp implements Writable {

    private int num;

    private Double sum_temp;


    public AvgSumTemp() {

    }


    public AvgSumTemp(int num, Double sum_temp) {

        this.num = num;

        this.sum_temp = sum_temp;

    }


    public int getNum() {

        return num;

    }


    public void setNum(int num) {

        this.num = num;

    }


    public Double getSum_temp() {

        return sum_temp;

    }


    public void setSum_temp(Double sum_temp) {

        this.sum_temp = sum_temp;

    }


    @Override

    public String toString() {

        return "AvgSumTemp{" +

                "num=" + num +

                ", sum_temp=" + sum_temp +

                '}';

    }


    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(num);

        out.writeDouble(sum_temp);

    }


    @Override

    public void readFields(DataInput in) throws IOException {

        this.num=in.readInt();

        this.sum_temp=in.readDouble();

    }

}

第一个作业:计算每年的温度总和数量

package com.briup.MR;


import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.Tool;


import java.io.IOException;



public class SumTemp{

    static class SumTempMapper

            extends Mapper<LongWritable, Text,Text,AvgSumTemp> {

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] val=value.toString().split(",");

            AvgSumTemp as=new AvgSumTemp(1,Double.parseDouble(val[1]));

            context.write(new Text(val[0]),as);

        }

    }

    static class SumTempReduce

            extends Reducer<Text,AvgSumTemp,Text,AvgSumTemp> {

        @Override

        protected void reduce(Text key, Iterable<AvgSumTemp> values, Context context) throws IOException, InterruptedException {

            int nums=0;

            double sum_temp=0;

            for(AvgSumTemp as:values){

                nums+=as.getNum();

                sum_temp+=as.getSum_temp();

            }

            context.write(key,new AvgSumTemp(nums,sum_temp));

        }

    }

}

第二个作业:计算每年的平均气温

package com.briup.MR;


import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;


public class AvgTemp {

    static class AvgTempMapper extends Mapper<Text,AvgSumTemp,Text, DoubleWritable>{

        @Override

        protected void map(Text key, AvgSumTemp value, Context context) throws IOException, InterruptedException {

            context.write(key,new DoubleWritable(value.getSum_temp()/value.getNum()));

        }

    }

}

作业流代码实现

package com.briup.MR;



import com.briup.MR.Avg.AverageTemperatureByYear;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class AvgWorkFlow extends Configured implements Tool {

    @Override

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();

        String input = conf.get("input");

        String temp_input=conf.get("temp_input");

        String output =conf.get("output");

        Job job1 = Job.getInstance(conf);

        job1.setJarByClass(this.getClass());

        job1.setJobName("sum_temp");


        job1.setMapperClass(SumTemp.SumTempMapper.class);

        job1.setMapOutputKeyClass(Text.class);

        job1.setMapOutputValueClass(AvgSumTemp.class);


        job1.setReducerClass(SumTemp.SumTempReduce.class);

        job1.setOutputKeyClass(Text.class);

        job1.setOutputValueClass(AvgSumTemp.class);



        job1.setInputFormatClass(TextInputFormat.class);

        job1.setOutputFormatClass(SequenceFileOutputFormat.class);

        TextInputFormat.addInputPath(job1, new Path(input));

        SequenceFileOutputFormat.setOutputPath(job1, new Path(temp_input));


        Job job2 = Job.getInstance(conf);

        job2.setJarByClass(this.getClass());

        job2.setJobName("avg_temp");


        job2.setMapperClass(AvgTemp.AvgTempMapper.class);

        job2.setMapOutputKeyClass(Text.class);

        job2.setMapOutputValueClass(DoubleWritable.class);


        job2.setNumReduceTasks(0);



        job2.setInputFormatClass(SequenceFileInputFormat.class);

        SequenceFileInputFormat.addInputPath(job2, new Path(temp_input));

        TextOutputFormat.setOutputPath(job2, new Path(output));




        ControlledJob cron_tab=new ControlledJob(conf);

        cron_tab.setJob(job1);

        ControlledJob cron_tab1=new ControlledJob(conf);

        cron_tab1.setJob(job2);

        cron_tab1.addDependingJob(cron_tab);


        JobControl jobs=new JobControl("work_flow");

        jobs.addJob(cron_tab);

        jobs.addJob(cron_tab1);


        Thread t=new Thread(jobs);

        t.start();

        while(true){

            for(ControlledJob c:jobs.getRunningJobList()){

                c.getJob().monitorAndPrintJob();

            }

            if(jobs.allFinished())break;

        }

        return 0;

    }


    public static void main(String[] args) throws Exception {

        System.exit(new ToolRunner().run(new AvgWorkFlow(),args));

    }

}

  1. 程序打包执行

yarn jar  GX_hadoop-1.0-SNAPSHOT.jar com.briup.MR.AvgWorkFlow -D input=/user/hdfs/wea.txt -D temp_input=/user/hdfs/temp_input -D output=/user/hdfs/avg_result

  1. 查询结果

hdfs dfs -cat /user/hdfs/avg_result/part-m-00000

最后修改: 2023年12月28日 星期四 16:36