大数据----基于sogou.500w.utf8数据的MapReduce编程

发布时间:2023年12月24日

一、前言

最近学习大数据的知识,需要做一些有关Hadoop MapReduce的实验
实验内容是在sogou.500w.utf8数据的基础上进行的。
实现以下内容:

  • 1、统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录
  • 2、统计rank<3并且order>2的所有UID及数量
  • 3、上午7-9点之间,搜索过“赶集网”的用户UID
  • 4、通过Rank:点击排名 对数据进行排序

该实验是在已经搭建好Hadoop集群的基础上进行的,如果还没有搭建,请参考以下文章进行集群搭建

二、准备数据

数据的字段说明
在这里插入图片描述
上传数据
创建目录

hdfs dfs -mkdir /homework

上传文件

hdfs dfs -put -p sogou.500w.utf8 /homework

三、编程实现

3.1、统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录

1、Mapper

package com.csust.homework1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TaskMapper1 extends Mapper<LongWritable, Text, Text, Text> {
    Text outputK = new Text();
    Text outputV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        if (words[2].contains("仙剑奇侠传")) {
            outputK.set(words[1]);
            outputV.set(words[2]);
            context.write(outputK, outputV);
        }
    }
}

2、Reduce

package com.csust.homework1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TaskReducer1 extends Reducer<Text,Text,Text,Text> {
    Text outputV = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        StringBuilder total= new StringBuilder();
        for (Text value : values) {
            total.append(value.toString());
            total.append("\t");
        }
        outputV.set(total.toString());
        context.write(key,outputV);
    }
}

3、Driver

package com.csust.homework1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TaskDriver1 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1:创建一个job任务对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //如果打包运行出错,则需要加该配置
        job.setJarByClass(TaskDriver1.class);
        job.setMapperClass(TaskMapper1.class);
        job.setReducerClass(TaskReducer1.class);
        //设置Mapper输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //设置路径
        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework1_out"));
        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4、程序运行
在这里插入图片描述

5、运行结果
在这里插入图片描述

3.2、统计rank<3并且order>2的所有UID及数量

1、Mapper

package com.csust.homework2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TaskMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text outK = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String data = value.toString();
        String[] words = data.split("\t");
        if (Integer.parseInt(words[3])<3 && Integer.parseInt(words[4])>2) {
            outK.set(words[1]); 
            context.write(outK, new IntWritable(1)); 
        }
    }
}

2、Reduce

package com.csust.homework2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TaskReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int total = 0;
        for (IntWritable value : values) {
            total += value.get();
        }
        context.write(key,new IntWritable(total));
    }
}

3、Driver

package com.csust.homework2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TaskDriver2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //创建一个job任务对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //如果打包运行出错,则需要加该配置
        job.setJarByClass(TaskDriver2.class);

        // 关联Mapper和Reducer的jar
        job.setMapperClass(TaskMapper2.class);
        job.setReducerClass(TaskReducer2.class);

        //设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework2_out"));

        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4、运行程序
在这里插入图片描述

5、运行结果
在这里插入图片描述

3.3、上午7-9点之间,搜索过“赶集网”的用户UID

1、Mapper

package com.csust.homework3;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class TaskMapper3 extends Mapper<LongWritable, Text, Text, Text> {
    Text outputK = new Text();
    Text outputV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");

        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
        try {
            Date time = format.parse(words[0]);
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(time);
            int hour = calendar.get(Calendar.HOUR_OF_DAY);
            if (hour >= 7 && hour < 9) {
                if (words[2].contains("赶集网")) {
                    //UID
                    outputK.set(words[1]);
                    outputV.set(words[2]);
                    context.write(outputK, outputV);
                }

            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
}

2、Reduce

package com.csust.homework3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TaskReducer3 extends Reducer<Text,Text,Text,Text> {
    Text outputV = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        StringBuilder total= new StringBuilder();
        for (Text value : values) {
            total.append(value.toString());
            total.append("\t");
        }
        outputV.set(total.toString());
        context.write(key,outputV);
    }
}

3、Driver

package com.csust.homework3;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TaskDriver3 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //创建一个job任务对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //如果打包运行出错,则需要加该配置
        job.setJarByClass(TaskDriver3.class);
        // 关联Mapper和Reducer的jar
        job.setMapperClass(TaskMapper3.class);
        job.setReducerClass(TaskReducer3.class);
        //设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework3_out"));
        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4、 运行程序
在这里插入图片描述

5、运行结果

在这里插入图片描述

3.4、通过Rank:点击排名 对数据进行排序

1、Mapper

package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TaskMapper4 extends Mapper<LongWritable, Text, IntWritable, Text> {
    Text outputV = new Text();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        IntWritable outputK = new IntWritable(Integer.parseInt(words[3])); 
        List<String> list = new ArrayList<String>(Arrays.asList(words));
        list.remove(3);
        String data = String.join("\t", list); 
        outputV.set(data);
        context.write(outputK, outputV); 
    }
}

2、Reduce

package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TaskReducer4 extends Reducer<IntWritable, Text, Text, IntWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, key); 
        }
    }
}

3、Driver

package com.csust.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
public class TaskDriver4 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //创建一个job任务对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //如果打包运行出错,则需要加该配置
        job.setJarByClass(TaskDriver4.class);
        // 关联Mapper和Reducer的jar
        job.setMapperClass(TaskMapper4.class);
        job.setReducerClass(TaskReducer4.class);
        //设置Mapper输出的kv类型
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        //设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置排序方式
        job.setSortComparatorClass(MyComparator.class);
        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework4_out"));

        //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4、Commparator

package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
public class MyComparator extends IntWritable.Comparator{
    public int compare(WritableComparable a, WritableComparable b) {
        return -super.compare(a, b);
    }
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return -super.compare(b1, s1, l1, b2, s2, l2);
    }
}

5、运行程序
在这里插入图片描述

6、 运行结果
在这里插入图片描述

四、参考

集群搭建
MapReduce实现单词统计
MapReduce统计手机号

文章来源:https://blog.csdn.net/niulinbiao/article/details/135186910
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。