Hadoop之WordCount

发布时间:2023年12月27日

1. pom.xml 中的依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.5</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.5</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.3.5</version>
</dependency>

1.?WordCountMapper

package com.neuedu.demo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Mapper<LongWritable, Text,Text, IntWritable>
 *    1. 前两个 LongWritable, Text 为 map输入数据的类型,
 *      1.1 LongWritable 文本文件的偏移量
 *      1.2 Text 是读取一行的内容
 *    2. 后两个 Text, IntWritable 为 map输出数据的类型,map是一个key-value的数据结构
 *      2.1 Text 是key的数据类型
 *      2.2 IntWritable 是 value的数据类型
 */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    /**
     *
     * @param key       map输入偏移量
     * @param value     map输入的内容,一行一行
     * @param context   Mapper.Context 可以使用此对象作为map的输出操作
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 将输入的内容转换为java字符串
        String words = value.toString();
        //-2. 默认分割为 空格分割,如果使用其他符号作为分隔符,可以在StringTokenizer构造方法中进行设置
        StringTokenizer itr = new StringTokenizer(words);
        //-3. 遍历,获取每个分割得到的单词
        while(itr.hasMoreTokens()){
            //-4. 获取每一行中的每个单词
            String word = itr.nextToken();
            //-5. 封装map 阶段输出的key和value,如:Text, IntWritable
            Text outKey = new Text(word);
            IntWritable outValue = new IntWritable(1);
            //-6. 将读取的内容按需求进行输出(map阶段的输出)
            context.write(outKey,outValue);

        }
    }
}

2.?WordCountReduce

package com.neuedu.demo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

/**
 * Reduce 阶段
 * Reducer<Text, IntWritable,Text,IntWritable>
 *     1. 前两个 Text, IntWritable 为map 输出 reduce 输入 数据类型
 *     1.1 Text 是 key的数据类型
 *     1.2 IntWritable 是 value数据类型
 *     2. 后两个 Text,IntWritable 为 reduce 输出数据类型,为key-value结构
 *
 */
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
    /**
     *
     * @param key       shuffle 输出,reduce 输入的key
     * @param values    shuffle 输出 value的迭代器
     * @param context   Reduce.Context 可以利用此对象做 reduce 输出
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //-1. 单词数量累加和
        int sum = 0;
        //-2. 遍历迭代器
        for(IntWritable value:values){
            sum+=value.get();
        }
        //-3. 封装reduce输出的value
        IntWritable outValue = new IntWritable(sum);
        System.out.println(key.toString()+":"+sum);
        //-4. reduce 输出
        context.write(key,outValue);
    }
}

3. WordCountDemo 之 本地开发环境(直接在Idea工具上右键运行即可)

package com.neuedu.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;

import java.util.IntSummaryStatistics;

/**
 *  测试方式:本地开发环境执行
 *
 */
public class WordCountDemo {
    public static void main(String[] args) throws Exception{
        //-1. 设置环境变量 hadoop 用户名 为 root
        System.setProperty("HADOOP_USER_NAME","root");
        //-2. 参数配置对象
        Configuration conf = new Configuration();
        //-3. 跨平台执行
        conf.set("mapreduce.app-submission.cross-platform","true");
        //-4. 本地运行
        conf.set("mapreduce.framework.name","local");
        //-5. 设置默认系统为 本地文件系统
        conf.set("fs.defaultFS","file:///");
        //-6. 声明Job对象,就是一个应用
        Job job = Job.getInstance(conf,"Word Count");
        //-7. 指定当前Job的驱动类(一般为当前类)
        job.setJarByClass(WordCount01.class);
        //-8. 指定当前Job的Mapper类
        job.setMapperClass(WordCountMapper.class);
        //-9. 指定当前Job的Combiner 注意:一定不能影响最终计算结果,否则 不使用
        job.setCombinerClass(IntSumReducer.class);
        //-10. 指定当前Job的Reduce类
        job.setReducerClass(WordCountReduce.class);
        //-11. 设置Map 输出 key的 数据类型
        job.setMapOutputKeyClass(Text.class);
        //-12. 设置Map 输出 value的 数据类型
        job.setMapOutputValueClass(IntWritable.class);
        //-13. 设置最终输出 key 的数据类型
        job.setOutputKeyClass(Text.class);
        //-14. 设置最终输出 value 的数据类型
        job.setOutputValueClass(IntWritable.class);
        //-15 定义 map 输入的路径 注意:该路径默认为hdfs路径
        FileInputFormat.addInputPath(job,new Path("D:/softtools/wcdata.txt"));
        //-16. 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
        Path dst = new Path("D:/softtools/result");
        //-17. 保护性代码,如果reduce输出目录已存在则删除
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(dst)){
            fs.delete(dst,true);
        }
        //-17.
        FileOutputFormat.setOutputPath(job,dst);
        //-18. 提交job

        System.exit(job.waitForCompletion(true)?0:1);

    }
}

文章来源:https://blog.csdn.net/majingbobo/article/details/134468109
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。