问题分析
求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为”M,该员工工资”,员工对应经理表数据key为经理编号、value为”E,该员工姓名,该员工工资”;然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q5EarnMoreThanManager extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] kv = value.toString().split(",");
            // 输出所有员工的工资键值对,以自己的编号为key,以工资为value
            context.write(new Text(kv[0].toString()), new Text("M," + kv[5]));
            // System.out.println("<"+kv[0].toString()+"---"+"M," + kv[5]+">");
            if (null != kv[3] && !"".equals(kv[3].toString())) {
                // 输出有直属上司的雇员的工资,以其上司的编号为key值,自己的工资信息为value
                context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5]));
                // System.out.println("<"+kv[3].toString()+"---"+"E," + kv[1] + "," + kv[5]+">");
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        // 在同一个reduce任务中 只处理一个键值对,因此在一个reduce中存放的是指定的上司和属于他的直属 下属
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            String empName;
            long empSalary = 0;
            HashMap<String, Long> empMap = new HashMap<String, Long>();

            long mgrSalary = 0;

            // 将职员和上司的工资信息分开存放,利于比较普通下属与上司的工资比较,找出符合条件的,否则该reduce任务不输出结果
            for (Text val : values) {
                if (val.toString().startsWith("E")) {
                    empName = val.toString().split(",")[1];
                    empSalary = Long.parseLong(val.toString().split(",")[2]);
                    empMap.put(empName, empSalary);
                } else {
                    mgrSalary = Long.parseLong(val.toString().split(",")[1]);
                }
            }

            for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) {
                if (entry.getValue() > mgrSalary) {
                    context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
                }
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        Job job = new Job(getConf(), "Q5EarnMoreThanManager");
        job.setJobName("Q5EarnMoreThanManager");

        job.setJarByClass(Q5EarnMoreThanManager.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
        System.exit(res);
    }
}


用于计算的基础数据请参考:http://itxm.cn/post/17840.html

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

使用MapReduce列出工资比公司平均工资要高的员工姓名及其工资详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。