统计店铺按月份的销售额和累计到该月的总销售额(SQL, DSL,RDD)
分组topN的实现(row_number(), rank(), dense_rank()方法
(1)需求1:统计有过连续3天以上销售的店铺有哪些,并且计算出连续三天以上的销售额
第一步:将每天的金额求和(同一天可能会有多个订单)
SELECT
sid,dt,SUM(money) day_money
FROM
v_orders
GROUP BY sid,dt
第二步:给每个商家中每日的订单按时间排序并打上编号
SELECT
sid,dt,day_money,
ROW_NUMBER() OVER(PARTITION BY sid ORDER BY dt) rn
FROM
(
SELECT
sid,dt,SUM(money) day_money
FROM
v_orders
GROUP BY sid,dt
) t1
第三步:获取date与rn的差值的字段
SELECT
sid ,dt,day_money,date_sub(dt,rn) diff
FROM
(
SELECT
sid,dt,day_money,
ROW_NUMBER() OVER(PARTITION BY sid ORDER BY dt) rn
FROM
(
SELECT
sid,dt,SUM(money) day_money
FROM
v_orders
GROUP BY sid,dt
) t1
) t2
SELECT
sid,MIN(dt),MAX(dt),SUM(day_money) cmoney,COUNT(*) cc
FROM
(
SELECT
sid ,dt,day_money,date_sub(dt,rn) diff
FROM
(
SELECT
sid,dt,day_money,
ROW_NUMBER() OVER(PARTITION BY sid ORDER BY dt) rn
FROM
(
SELECT
sid,dt,SUM(money) day_money
FROM
v_orders
GROUP BY sid,dt
) t1
) t2
)
GROUP BY sid,diff
HAVING cc >=3
(2)需求2:统计店铺按月份的销售额和累计到该月的总销售额
SQL风格(只写sq语句,省略代码部分)
ELECT
sid,month,month_sales,
SUM(month_sales) OVER(PARTITION BY sid ORDER BY month) total_sales // 默认是其实位置到当前位置的累加
--PARTITION BY sid ORDER BY mth ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 完整的写法
FROM
(
SELECT
sid,
DATE_FORMAT(dt,'yyyy-MM') month,
--substr(dt,1,7) month, 用此函数来取月份也行
SUM(money) month_sales
FROM
v_orders
GROUP BY sid, month
)
row_number(), rank(), dense_rank()方法的区别
row_number() over() 打行号,行号从1开始
rank() over() 排序,有并列,如果有两个第1,就没有第2了,然后直接第3,跳号
dense_rank() over() 排序,有并列,不跳号
SQL
注意点:此处的文件格式是text的,所以需要用SparkContext的textFile方法来读取数据,然后处理此数据,得到需要的字段(subject,teacher),再利用toDF(“subject”, “teacher”)方法获取对应的DataFrame,从而创建相应的视图
object FavoriteTeacherSQL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
import spark.implicits._
val lines: RDD[String] = spark.sparkContext.textFile("E:\\javafile\\spark\\teacher100.txt")
// 处理数据,获取DataFrame,用于创建视图
val df: DataFrame = lines.map(line => {
val fields = line.split("/")
val subject = fields(2).split("\\.")(0)
val teacher = fields(3)
(subject, teacher)
}).toDF("subject", "teacher")
// 创建视图
df.createTempView("v_teacher")
var topN: Int = 2
// SQL实现分组topN
spark.sql(
s"""
|SELECT
| subject,teacher,counts
| rk
|FROM
|(
| SELECT
| subject,teacher,counts,
| RANK() OVER(PARTITION BY subject ORDER BY counts DESC) rk
| FROM
| (
| SELECT
| subject,teacher,
| count(*) counts
| FROM
| v_teacher
| GROUP BY subject, teacher
| ) t1
|) t2 WHERE rk <= $topN
|""".stripMargin).show()
}
}