1. pom.xml 中的依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.3.5</version> </dependency>
1.?WordCountMapper
package com.neuedu.demo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* Mapper<LongWritable, Text,Text, IntWritable>
* 1. 前两个 LongWritable, Text 为 map输入数据的类型,
* 1.1 LongWritable 文本文件的偏移量
* 1.2 Text 是读取一行的内容
* 2. 后两个 Text, IntWritable 为 map输出数据的类型,map是一个key-value的数据结构
* 2.1 Text 是key的数据类型
* 2.2 IntWritable 是 value的数据类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
/**
*
* @param key map输入偏移量
* @param value map输入的内容,一行一行
* @param context Mapper.Context 可以使用此对象作为map的输出操作
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 将输入的内容转换为java字符串
String words = value.toString();
//-2. 默认分割为 空格分割,如果使用其他符号作为分隔符,可以在StringTokenizer构造方法中进行设置
StringTokenizer itr = new StringTokenizer(words);
//-3. 遍历,获取每个分割得到的单词
while(itr.hasMoreTokens()){
//-4. 获取每一行中的每个单词
String word = itr.nextToken();
//-5. 封装map 阶段输出的key和value,如:Text, IntWritable
Text outKey = new Text(word);
IntWritable outValue = new IntWritable(1);
//-6. 将读取的内容按需求进行输出(map阶段的输出)
context.write(outKey,outValue);
}
}
}
2.?WordCountReduce
package com.neuedu.demo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Reduce 阶段
* Reducer<Text, IntWritable,Text,IntWritable>
* 1. 前两个 Text, IntWritable 为map 输出 reduce 输入 数据类型
* 1.1 Text 是 key的数据类型
* 1.2 IntWritable 是 value数据类型
* 2. 后两个 Text,IntWritable 为 reduce 输出数据类型,为key-value结构
*
*/
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
/**
*
* @param key shuffle 输出,reduce 输入的key
* @param values shuffle 输出 value的迭代器
* @param context Reduce.Context 可以利用此对象做 reduce 输出
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//-1. 单词数量累加和
int sum = 0;
//-2. 遍历迭代器
for(IntWritable value:values){
sum+=value.get();
}
//-3. 封装reduce输出的value
IntWritable outValue = new IntWritable(sum);
System.out.println(key.toString()+":"+sum);
//-4. reduce 输出
context.write(key,outValue);
}
}
3. WordCountDemo 之 本地开发环境(直接在Idea工具上右键运行即可)
package com.neuedu.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import java.util.IntSummaryStatistics;
/**
* 测试方式:本地开发环境执行
*
*/
public class WordCountDemo {
public static void main(String[] args) throws Exception{
//-1. 设置环境变量 hadoop 用户名 为 root
System.setProperty("HADOOP_USER_NAME","root");
//-2. 参数配置对象
Configuration conf = new Configuration();
//-3. 跨平台执行
conf.set("mapreduce.app-submission.cross-platform","true");
//-4. 本地运行
conf.set("mapreduce.framework.name","local");
//-5. 设置默认系统为 本地文件系统
conf.set("fs.defaultFS","file:///");
//-6. 声明Job对象,就是一个应用
Job job = Job.getInstance(conf,"Word Count");
//-7. 指定当前Job的驱动类(一般为当前类)
job.setJarByClass(WordCount01.class);
//-8. 指定当前Job的Mapper类
job.setMapperClass(WordCountMapper.class);
//-9. 指定当前Job的Combiner 注意:一定不能影响最终计算结果,否则 不使用
job.setCombinerClass(IntSumReducer.class);
//-10. 指定当前Job的Reduce类
job.setReducerClass(WordCountReduce.class);
//-11. 设置Map 输出 key的 数据类型
job.setMapOutputKeyClass(Text.class);
//-12. 设置Map 输出 value的 数据类型
job.setMapOutputValueClass(IntWritable.class);
//-13. 设置最终输出 key 的数据类型
job.setOutputKeyClass(Text.class);
//-14. 设置最终输出 value 的数据类型
job.setOutputValueClass(IntWritable.class);
//-15 定义 map 输入的路径 注意:该路径默认为hdfs路径
FileInputFormat.addInputPath(job,new Path("D:/softtools/wcdata.txt"));
//-16. 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
Path dst = new Path("D:/softtools/result");
//-17. 保护性代码,如果reduce输出目录已存在则删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(dst)){
fs.delete(dst,true);
}
//-17.
FileOutputFormat.setOutputPath(job,dst);
//-18. 提交job
System.exit(job.waitForCompletion(true)?0:1);
}
}