需求:有一张test表,表的字段为:A, B, C, amount, 其中A, B, C为维度字段,求以三个维度任意组合,统计sum(amount)
分组窗口 groupWindow
窗口表值函数 window TVF(支持topN)
开窗函数 over
API的用法:
Tumble.over(rowInterval(5L)).on($("处理时间")).as("w")
Slide.over(rowInterval(5L)).every(rowInterval(3L)).on($("pt")).as("w")
table.window(w1).groupBy($("w"), $("id")).select($("id"), $("vc")).execute().print();
SQL的用法:
//滚动时间窗口
select
id,
tumble_start(pt,interval '5' second) as wStart,
tumble_end(pt,interval '5' second) as wEnd,
sum(vc) sumvc
from t1
group by
tumble(et,interval '5' second),id;
//滑动时间窗口
select
id,
hop_start(pt,interval '3' second, interval '5' second) as wStart,
hop_end(pt,interval '3' second, interval '5' second) as wEnd,
sum(vc) svc
from t1
group by
hop(et,interval '5' second,)
//滚动窗口
select
window_start,
window_end,
SUM(price)
From
Table(
tumble(table t1,descriptor(pt)),//事件时间改为et
interval '5' second
)
group by window_start, window_end, id;
//滑动窗口(窗口大小必须是滑动步长的整数倍)
select
window_start,
window_end,
SUM(price)
From
Table(
hop(table t1,descriptor(pt)),
interval '3' second,//滑动步长
interval '6' second//窗口大小
)
group by window_start, window_end, id;
//累积窗口(统计类似0~1,0~2,0~3这样的窗口/)
select
window_start,
window_end,
SUM(price)
From
Table(
cumulate(table t1,descriptor(pt)),
interval '2' second,//步长,一般为小时
interval '10' second//每一轮的大小,一般为一天
)
group by window_start, window_end, id;
Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_row).follow(current_row).as("w");
Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(rowinterval(2L)).follow(current_row).as("w");
Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_range).follow(current_range).as("w");
Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(lit(2).second()).follow(current_range).as("w");
sum().over( $ ("w1"))
//上无边界到当前行
select
id,
vc,
sum(vc) over (partition by id
order by pt
rows between unbounded preceding and
current row ) sumvc
from t1;
//上两行到当前行
//上无边界到当前时间
//上两秒到当前时间
窗口表值函数 + over窗口实现
//统计每个user的点击次数
select
user,
count(*) cnt,
window_start,
window_end
from Table(
tumble(talbe t1, descriptor(et), interval '10' second)
)
group by window_start, window_end,user;
(select
user,
cnt,
row_number() over(partition by window_start,window_end
order by cnt desc ) rk
from t2) t3
where row_num <= N
, 这段代码是识别为TopN查询的关键.select
user,
cnt,
rk
from t3
where rk <= 3;
实际上,所有代码可以合并为一个整体:
TopN的特殊写法,根据主键开窗,只取where row_num = 1的数据,即能达到对重复数据进行去重的效果。
需求:统计每个窗口中每个url最后到达的数据
(select
url,
ts,
window_start,
window_end
from ) as t1
//按照窗口的开始时间和结束时间,url进行分区,通过时间排序,求排名
(select
url,
ts,
window_start,
window_end,
row_number(partition by window_start, window_end, url order by ts desc) rk
from t1;) as t2
// 取rk = 1
select
url,
ts,
window_end
from t2
where rk = 1;