3.MapReduce实践-单词统计

发布时间:2024年01月08日

概述

官网文档速递

MapReduce :分布式计算框架
通常情况下,一个 MR 作业是有 2 个部分构成:MapTask ReduceTask(可以没有)

在这里插入图片描述

MapReduce 核心进程

主要有三个:

  • MapTask
  • ReduceTask
  • MRAppMaster :负责整个 MR 作业的调度/协调工作

MapReduce 编程规范

与其说编程规范,编程模板更容易理解。

官方文档速递

  • MapReduce 作业的编程模板
    • 自定义 Mapper
      • extends Mapper<Object, Text, Text, IntWritable>
        • Object, Text : Mapper 的输入数据 Key 的类型,输入数据 Value 的类型
        • Text, IntWritable :Mapper 的输出数据 Key 的类型,输出数据 Value 的类型
      • 重写 public void map 方法:实现自己的业务逻辑
    • 自定义 Reducer
      • extends Reducer<Text,IntWritable,Text,IntWritable>
        • Text,IntWritable :Reducer 的输入数据 Key 的类型,输入数据 Value 的类型
        • 重写 public void reduce 实现自己的业务逻辑
    • 自定义 Driver
      • 通过 Driver 将整个 MR 作业串起来
        • 获取 Job
        • 设置作业的 Class
        • 设置自定义的 Mapper 类
        • 设置自定义的 Reducer 类
        • 设置自定义的 Mapper 类输出的 Key Value 类型
        • 设置自定义的 Reducer 类输出的 Key Value 类型
        • 设置作业的输入数据目录
        • 设置作业的输出数据目录
        • 作业的提交

单词统计案例

public class WordCount {

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String str = value.toString();
            String[] split = str.split(",");
            IntWritable ONE = new IntWritable(1);
            for (String word : split) {
                context.write(new Text(word), ONE);
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count = count + value.get();
            }
            context.write(key, new IntWritable(count));
        }
    }


    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration();
		// 可以是文件夹,那么会统计文件夹下所有的文件
        String sourcePath = "data/wc.data";
        String distPath = "downloadOut/wc-out.data";

        FileUtil.deleteIfExist(configuration, distPath);

        Job job = Job.getInstance(configuration, "word count");
        job.setJarByClass(WordCount.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(sourcePath));
        FileOutputFormat.setOutputPath(job, new Path(distPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
public static void deleteIfExist(Configuration configuration, String distPath) throws IOException {
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(new Path(distPath))) {
        fileSystem.delete(new Path(distPath), true);
    }
    fileSystem.close();
}

源码

在这里插入图片描述

结束

至此 MapReduce实践-单词统计 就结束了,如有疑问,欢迎评论区留言。

文章来源:https://blog.csdn.net/2301_79691134/article/details/135393637
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。