问题分析
求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,在该例子中,只需要一个reduce任务就可以处理完数据,从而能先求出平均工资,然后进行比较得出结果。
在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为1、value为”该员工姓名 ,员工工资”;然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q6HigherThanAveSalary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] kv = value.toString().split(",");

            // 输出<0,每个人的工资> 用于计算平均工资
            context.write(new IntWritable(0), new Text(kv[5]));
            // 输出<1,该员工姓名+员工工资> 用于和平均工资比较
            context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5]));
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {

        private long allSalary = 0;
        private int allEmpCount = 0;
        private long aveSalary = 0;

        private long empSalary = 0;

        public void reduce(IntWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            for (Text val : values) {
                // 得到工资总数
                if (0 == key.get()) {
                    allSalary += Long.parseLong(val.toString());
                    allEmpCount++;
                    System.out.println("allEmpCount = " + allEmpCount);
                } else if (1 == key.get()) {
                    if (aveSalary == 0) {
                        aveSalary = allSalary / allEmpCount; // 计算平均工资
                        context.write(new Text("Average Salary = "), new Text("" + aveSalary));
                        context.write(new Text("Following employees have salarys higher than Average:"), new Text(""));
                    }
                    // System.out.println("Employee salary = " + val.toString());
                    // aveSalary = allSalary / allEmpCount;

                    empSalary = Long.parseLong(val.toString().split(",")[1]);
                    if (empSalary > aveSalary) {
                        context.write(new Text(val.toString().split(",")[0]), new Text("" + empSalary));
                    }
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        Job job = new Job(getConf(), "Q6HigherThanAveSalary");
        job.setJobName("Q6HigherThanAveSalary");

        job.setJarByClass(Q6HigherThanAveSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setNumReduceTasks(1);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);
        System.exit(res);
    }
}


用于计算的基础数据请参考:http://itxm.cn/post/17840.html

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

使用MapReduce列出名字以J开头的员工姓名及其所属部门名称详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。