【MapReduce】对员工数据按照部门分区并对每个分区排序

发布时间:2024年01月17日

? ? ? ? 员工信息全部存储在emp.csv文件中,员工的属性有:员工id、名称、职位、领导id、雇佣时间、工资、奖金、部门号。

????????在MapReduce中想要使用员工的信息,需要对员工进行序列化处理。因为MapReduce是一个分布式框架数据会在不同节点之间进行传输,所以需要将对象转换成字节序列以便于存储或传输。并且如果对象不序列化程序会出错。

一、主类

主类作用:在主类中设置MapReduce中的map类和reduce类,指定分区规则类、设置启动reduce的数量,设置map阶段和reduce阶段的输入输出类型。上传文件。


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeMain {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        //设置主类
        job.setJarByClass(EmployeeMain.class);
        //设置Map类
        job.setMapperClass(EmployeeMapper.class);
        //设置Reduce类
        job.setReducerClass(SalaryTotalReducer.class);

        //指定分区规则
        job.setPartitionerClass(DeptnoPartitioner.class);
        //设置启动reduce数量
        job.setNumReduceTasks(3);

        job.setMapOutputKeyClass(IntWritable.class);// map阶段的输出的key
        job.setMapOutputValueClass(Employee.class);// map阶段的输出的value

        job.setOutputKeyClass(IntWritable.class);// reduce阶段的输出的key
        job.setOutputValueClass(Employee.class);// reduce阶段的输出的value
        //Windows本地路径
        FileInputFormat.setInputPaths(job, new Path("./src/main/java/serialSortPartitioner/emp.csv"));
        FileOutputFormat.setOutputPath(job, new Path("./src/main/java/serialSortPartitioner/output"));
        System.out.println("计算开始---------------");
        boolean res = job.waitForCompletion(true);
        System.out.println("计算结束---------------");
    }

}

二、员工类


import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class Employee implements Writable{
    //员工id
    private int empno;
    //员工名称
    private String ename;
    //员工职位
    private String job;
    //直接领导的员工id
    private int mgr;
    //雇佣时间
    private String hiredate;
    //工资
    private int sal;
    //奖金
    private int comm;
    //部门号
    private int deptno;

    public Employee(){}


    //序列化
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.empno);
        out.writeUTF(this.ename);
        out.writeUTF(this.job);
        out.writeInt(this.mgr);
        out.writeUTF(this.hiredate);
        out.writeInt(this.sal);
        out.writeInt(this.comm);
        out.writeInt(this.deptno);
    }

    //反序列化
    public void readFields(DataInput in) throws IOException {
        this.empno = in.readInt();
        this.ename = in.readUTF();
        this.job = in.readUTF();
        this.mgr = in.readInt();
        this.hiredate = in.readUTF();
        this.sal = in.readInt();
        this.comm = in.readInt();
        this.deptno = in.readInt();

    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public Integer getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "empno=" + empno +
                ", ename='" + ename + '\'' +
                ", job='" + job + '\'' +
                ", mgr=" + mgr +
                ", hiredate='" + hiredate + '\'' +
                ", sal=" + sal +
                ", comm=" + comm +
                ", deptno=" + deptno +
                '}';
    }
}

三、map类

map类主要作用是输入员工的数据到MapReduce中。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        //get values string
        String v1string = v1.toString();
        //spile string
        String words[] = v1string.split(",");
        //map out key/value
        //System.out.println("display this turn <key,1> ");
        Employee e = new Employee();
        //员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //姓名
        e.setEname(words[1]);
        //职位
        e.setJob(words[2]);
        //老板号
        try {
            e.setMgr(Integer.parseInt(words[3]));
        } catch (Exception e1) {
            //没有老板号
            e.setMgr(-1);
        }
        //入职日期
        e.setHiredate(words[4]);
        //工资
        e.setSal(Integer.parseInt(words[5]));
        //奖金
        try {
            e.setComm(Integer.parseInt(words[6]));

        } catch (Exception e2) {
            e.setComm(0);
        }
        //部门号
        e.setDeptno(Integer.parseInt(words[7]));
//        System.out.println("map   " + e.toString());
        //根据部门号作为关键字,进行默认排序,也可以设置为空
        context.write(new IntWritable(e.getDeptno()), e);
    }

    @Override
    public void run(Context context) throws IOException, InterruptedException {
        super.run(context);
    }
}

三、分区类

根据部门号进行分区


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class DeptnoPartitioner extends Partitioner<IntWritable, Employee> {

    //根据部门号设置分区
    @Override
    public int getPartition(IntWritable k2, Employee v2, int numPartitions) {
        // TODO Auto-generated method stub
        if (v2.getDeptno() <= 10) {
            return 0;
        } else if (v2.getDeptno() <= 20) {
            return 1;
        } else return 2;
    }
}

四、reduce类

设置的分区数量和启动的reduce数量相同(在主类中设置启动数量),在reduce类中进行排序就可以实现每个分区进行自定义排序。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;

public class SalaryTotalReducer extends Reducer<IntWritable, Employee, NullWritable, Employee> {
    Comparator comparetor = new compareclass();//TreeMap对象可以使用降序的比较器
    private TreeMap<Integer, Employee> repToRecordMap =
            new TreeMap<Integer, Employee>(comparetor); //如果参数为空是默认升序比较器

    protected void reduce(IntWritable k3, Iterable<Employee> v3,
                          Context context)
            throws IOException, InterruptedException {

        //在这里自定义排序
        for (Employee e : v3) {
            repToRecordMap.put(e.getSal(),e);
        }
        //在这里获取排序后的结果
        for (Integer e : repToRecordMap.keySet()) {
            //在这里工资数据会改变(原因未知),需要重新设置为原来的工资
            repToRecordMap.get(e).setSal(e);
            context.write(NullWritable.get(),repToRecordMap.get(e));
        }
    }
}


class compareclass implements Comparator<Integer> {
    //返回一个基本类型的整型,谁大谁排后面(升序).
    //返回负数表示:o1 小于o2
    //返回0表示:表示:o1和o2相等
    //返回正数表示:o1大于o2。
    //默认用o1-o2,创建TreeMap对象时可以不用这个继承类,但是要降序,必须修改compare里面的逻辑o2-o1
    //谁大谁排在前面(降序)用o2-o1
    @Override
    //排序
    public int compare(Integer o1, Integer o2) {
        // TODO Auto-generated method stub
        return o1 - o2;
    }
}

运行结果

文章来源:https://blog.csdn.net/m0_62332728/article/details/135659818
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。