MR实战:统计总分与平均分

发布时间:2023年12月26日

一、实战概述

  • 任务:使用MapReduce框架计算学生总分与平均分

  • 今天,我将向你们展示如何使用Apache Hadoop的MapReduce框架来计算每个学生的总分和平均分。我们有一个包含六个字段(姓名、语文、数学、英语、物理、化学)的成绩表,共有五条记录。

  • 在大数据处理领域,Apache Hadoop的MapReduce框架以其强大的分布式计算能力备受瞩目。本次演示我们将通过实际操作,展示如何利用MapReduce来处理和分析学生成绩数据,计算每个学生的总分与平均分。此过程涵盖了数据准备、Maven项目搭建、Mapper和Reducer实现以及作业运行等多个关键步骤,旨在深入理解并掌握MapReduce的工作原理和应用实践。

  • 以下是我们将要进行的步骤:

  1. 准备数据:

    • 启动Hadoop服务。
    • 在虚拟机上创建一个名为score.txt的文本文件,其中每列之间故意隔两个空格。
    • 在HDFS上创建/calcscore/input目录,并将score.txt文件上传到这个目录。
  2. 实现步骤:

    • 创建一个名为CalcScore的Maven项目,并在pom.xml文件中添加hadoopjunit依赖。
    • resources目录下创建log4j.properties文件,用于日志配置。
    • 创建一个名为ScoreMapper的映射器类,该类负责读取输入文件中的每行数据,然后将姓名和对应的科目成绩作为键值对输出。
    • 创建一个名为ScoreDriver的驱动器类,该类负责设置作业的配置、输入和输出路径,以及调用Mapper和Reducer。
    • 创建一个名为ScoreReducer的归并器类,该类负责接收Mapper输出的键值对,计算每个学生的总分和平均分,并将结果作为新的键值对输出。
  • 接下来,我们将详细讲解每个类的实现细节。

  • ScoreMapper类中,我们重写了map方法。这个方法首先获取输入行的内容,然后按照空格拆分得到字段数组。我们获取姓名,并遍历各科成绩,将每对<姓名, 成绩>写入上下文。

  • ScoreDriver类中,我们首先创建一个配置对象,并设置数据节点主机名属性。然后,我们获取作业实例,设置作业启动类、Mapper类和map任务的输出键值类型。我们定义了输入和输出目录的URI字符串,并创建了相应的Path对象。我们删除输出目录(如果已存在),然后给作业添加输入目录和设置输出目录。最后,我们等待作业完成并输出统计结果。

  • ScoreReducer类中,我们重写了reduce方法。这个方法接收Mapper输出的键值对,计算每个学生的总分和平均分,然后将结果作为新的键值对输出。

  • 最后,我们修改ScoreDriver类,设置Reducer类及其输出键值类型,然后运行ScoreDriver类。我们可以通过HDFS Shell命令查看结果文件内容。

  • 以上就是使用MapReduce框架计算学生总分与平均分的全过程。

二、提出任务

  • 成绩表,包含六个字段(姓名、语文、数学、英语、物理、化学),有五条记录
    在这里插入图片描述
  • 利用MR框架,计算每个同学的总分与平均分
    在这里插入图片描述

三、完成任务

(一)准备数据

  • 启动hadoop服务
    在这里插入图片描述

1、在虚拟机上创建文本文件

  • 创建calcscore目录,在里面创建score.txt文件
    在这里插入图片描述
  • 注意:每列之间故意隔两个空格

2、上传文件到HDFS指定目录

  • 创建/calcscore/input目录,执行命令:hdfs dfs -mkdir -p /calcscore/input
    在这里插入图片描述
  • 将文本文件score.txt,上传到HDFS的/calcscore/input目录
    在这里插入图片描述

(二)实现步骤

1、创建Maven项目

  • Maven项目 - CalcScore
    在这里插入图片描述

  • 单击【Finish】按钮
    在这里插入图片描述

2、添加相关依赖

  • pom.xml文件里添加hadoopjunit依赖
    在这里插入图片描述
<dependencies>                                      
    <!--hadoop客户端-->                                
    <dependency>                                    
        <groupId>org.apache.hadoop</groupId>        
        <artifactId>hadoop-client</artifactId>      
        <version>3.3.4</version>                    
    </dependency>                                   
    <!--单元测试框架-->                                   
    <dependency>                                    
        <groupId>junit</groupId>                    
        <artifactId>junit</artifactId>              
        <version>4.13.2</version>                   
    </dependency>                                   
</dependencies>                                     

3、创建日志属性文件

  • resources目录里创建log4j.properties文件
    在这里插入图片描述
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/calcscore.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4、创建成绩映射器类

  • net.hw.mr里创建ScoreMapper
    在这里插入图片描述
package net.hw.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 功能:成绩映射器类
 * 作者:华卫
 * 日期:2022年12月17日
 */
public class ScoreMapper extends Mapper <LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        // 获取行内容
        String line = value.toString();
        // 按空格拆分得到字段数组
        String[] fields = line.split(" ");
        // 获取姓名
        String name = fields[0].trim();
        // 遍历各科成绩
        for (int i = 1; i < fields.length; i++) {
            // 获取成绩
            int score = Integer.parseInt(fields[i].trim());
            // 写入<姓名,成绩>键值对
            context.write(new Text(name), new IntWritable(score));
        }
    }
}

5、创建成绩驱动器类

  • net.hw.mr包里创建ScoreDriver
    在这里插入图片描述
package net.hw.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

/**
 * 功能:成绩驱动器类
 * 作者:华卫
 * 日期:2022年12月17日
 */
public class ScoreDriver {
    public static void main(String[] args) throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        // 设置数据节点主机名属性
        conf.set("dfs.client.use.datanode.hostname", "true");

        // 获取作业实例
        Job job = Job.getInstance(conf);
        // 设置作业启动类
        job.setJarByClass(ScoreDriver.class);

        // 设置Mapper类
        job.setMapperClass(ScoreMapper.class);
        // 设置map任务输出键类型
        job.setMapOutputKeyClass(Text.class);
        // 设置map任务输出值类型
        job.setMapOutputValueClass(IntWritable.class);

        // 定义uri字符串
        String uri = "hdfs://master:9000";
        // 创建输入目录
        Path inputPath = new Path(uri + "/calcscore/input");
        // 创建输出目录
        Path outputPath = new Path(uri + "/calcscore/output");

        // 获取文件系统
        FileSystem fs =  FileSystem.get(new URI(uri), conf);
        // 删除输出目录(第二个参数设置是否递归)
        fs.delete(outputPath, true);

        // 给作业添加输入目录(允许多个)
        FileInputFormat.addInputPath(job, inputPath);
        // 给作业设置输出目录(只能一个)
        FileOutputFormat.setOutputPath(job, outputPath);

        // 等待作业完成
        job.waitForCompletion(true);

        // 输出统计结果
        System.out.println("======统计结果======");
        FileStatus[] fileStatuses = fs.listStatus(outputPath);
        for (int i = 1; i < fileStatuses.length; i++) {
            // 输出结果文件路径
            System.out.println(fileStatuses[i].getPath());
            // 获取文件系统数据字节输入流
            FSDataInputStream in = fs.open(fileStatuses[i].getPath());
            // 将结果文件显示在控制台
            IOUtils.copyBytes(in, System.out, 4096, false);
        }
    }
}

6、启动成绩驱动器类,查看结果

  • 运行ScoreDriver
    在这里插入图片描述

7、创建成绩归并器类

  • net.hw.mr包里创建ScoreReducer
    在这里插入图片描述
package net.hw.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.DecimalFormat;

/**
 * 功能:成绩归并器类
 * 作者:华卫
 * 日期:2022年12月17日
 */
public class ScoreReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
        // 声明变量
        int count = 0; // 科目数
        int sum = 0; // 总分
        double avg = 0; // 平均分
        // 遍历迭代器计算总分
        for (IntWritable value : values) {
            count++; // 科目数累加
            sum += value.get(); // 总分累加
        }
        // 计算平均分
        avg = sum * 1.0 / count;
        // 创建小数格式对象
        DecimalFormat df = new DecimalFormat("#.#");
        // 拼接每个学生总分与平均分成绩信息
        String scoreInfo = "(" + key + "," + sum + "," + df.format(avg) + ")";
        // 写入键值对
        context.write(new Text(scoreInfo), NullWritable.get());
    }
}

8、修改成绩驱动器类

  • 设置Reducer类及其输出键值类型
    在这里插入图片描述

9、启动成绩驱动器列,查看结果

  • 运行ScoreDriver
    在这里插入图片描述
  • 利用HDFS Shell命令查看结果文件内容
    在这里插入图片描述
文章来源:https://blog.csdn.net/howard2005/article/details/135213545
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。