查看数据集格式
明确需求
明确步骤
读取文件
抽取需要的列
以年月为基础,进行 reduceByKey 统计Dongsi地区的PM
排序
获取结果
编码
拷贝数据集
data.rar(已上传资源——SparkCore阶段练习数据集)
创建类
编写代码
运行测试
@Test
def pmProcess(): Unit = {
?// 1. 创建sc对象
?val conf = new SparkConf().setMaster("local[6]").setAppName("stage_practice")
?val sc = new SparkContext(conf)
?// 2. 读取文件
?val source = sc.textFile("./dataset/BeijingPM20100101_20151231_noheader.csv")
?// 3. 通过算子处理数据
?// ? 3.1 map切数据 ((年,月),pm)
?source.map(item => ((item.split(",")(1), item.split(",")(2)), item.split(",")(6)))
?// ? 3.2 filter 过滤空 和 NA 数据
? .filter(item => StringUtils.isNotEmpty(item._2) && !item._2.equalsIgnoreCase("NA")) // equalsIgnoreCase 判断两个字符串是否相等,忽略字符串的大小写,
?// ? 3.3 toInt 数据类型转换
? .map(item => (item._1, item._2.toInt))
?// ? 3.4 聚合数据
? .reduceByKey((curr, agg) => curr + agg)
?// ? 3.5 排序
? .sortBy(item => item._2, ascending = false) // 降序
?// 4.获取结果
? .take(10)
? .foreach(item => println(item))
?// 5. 关闭sc
?sc.stop()
}