来源:B站尚硅谷
Table API和SQL是最上层的API,在Flink中这两种API被集成在一起,SQL执行的对象也是Flink中的表(Table),所以我们一般会认为它们是一体的。Flink是批流统一的处理框架,无论是批处理(DataSet API)还是流处理(DataStream API),在上层应用中都可以直接使用Table API或者SQL来实现;这两种API对于一张表执行相同的查询操作,得到的结果是完全一样的。
SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的,可通过纯 SQL 来开发和运行一个Flink 任务。具体的API调用可以随时关注官网的更新变化。
1)启动Flink
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
2)启动Flink的sql-client
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session
1)结果显示模式
#默认table,还可以设置为tableau、changelog
SET sql-client.execution.result-mode=tableau;
3)执行环境
SET execution.runtime-mode=streaming; #默认streaming,也可以设置batch
4)默认并行度
SET parallelism.default=1;
5)设置状态TTL
SET table.exec.state.ttl=1000;
6)通过sql文件初始化
(1)创建sql文件
vim conf/sql-client-init.sql
SET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;
(2)启动时,指定sql文件
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
我们可以将关系型表/SQL与流处理做一个对比,如表所示。
关系型表/SQL | 流处理 | |
---|---|---|
处理的数据对象 | 字段元组的有界集合 | 字段元组的无限序列 |
查询(Query)对数据的访问 | 可以访问到完整的数据输入 | 无法访问到所有数据,必须“持续”等待流式输入 |
查询终止条件 | 生成固定大小的结果集后终止 | 永不停止,根据持续收到的数据不断更新查询结果 |
可以看到,其实关系型表和SQL,主要就是针对批处理设计的,这和流处理有着天生的隔阂。
流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。
如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。
例如,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。
1)更新(Update)查询
我们在代码中定义了一个SQL查询。
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。
2)追加(Append)查询
上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");
这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。
由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作。所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。
与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:
在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。
所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。
时间属性的数据类型必须为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。
按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。
事件时间属性可以在创建表DDL中定义,增加一个字段,通过WATERMARK语句来定义事件时间属性。具体定义方式如下:
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
这里我们把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。
时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。
在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。
CREATE TABLE EventTable(
user STRING,
url STRING,
ts AS PROCTIME()
) WITH (
...
);
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
(2)案例
CREATE DATABASE db_flink;
SHOW DATABASES
(2)查询当前数据库
SHOW CURRENT DATABASE
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
RESTRICT:删除非空数据库会触发异常。默认启用
CASCADE:删除非空数据库也会删除所有相关的表和函数。
DROP DATABASE db_flink2;
USE database_name;
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
① physical_column_definition
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
② metadata_column_definition
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。例如,我们可以使用元数据列从Kafka记录中读取和写入时间戳,用于基于时间的操作(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记)。connector和format文档列出了每个组件可用的元数据字段。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx 子句可省略
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致,程序运行时会自动 cast强转,但是这要求两种数据类型是可以强转的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默认情况下,Flink SQL planner 认为 metadata 列可以读取和写入。然而,在许多情况下,外部系统提供的只读元数据字段比可写字段多。因此,可以使用VIRTUAL关键字排除元数据列的持久化(表示只读)。
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
③ computed_column_definition
计算列是使用语法column_name AS computed_column_expression生成的虚拟列。
计算列就是拿已有的一些列经过一些自定义的运算生成的新列,在物理上并不存储在表中,只能读不能写。列的数据类型从给定的表达式自动派生,无需手动声明。
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity
) WITH (
'connector' = 'kafka'
...
);
④ 定义Watermark
Flink SQL 提供了几种 WATERMARK 生产策略:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced
) WITH (
'connector' = 'kafka'
...
);
⑥ PARTITIONED BY
创建分区表
⑦ with语句
用于创建表的表属性,用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。如下是Kafka的映射表:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
一般 with 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。
⑧ LIKE
用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除表的某些部分。
可以使用该子句重用(可能还会覆盖)某些连接器属性,或者向外部定义的表添加水印。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
⑨ AS select_statement(CTAS)
在一个create-table-as-select (CTAS)语句中,还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka',
...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
注意:CTAS有以下限制:
CREATE TABLE test(
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'print'
);
CREATE TABLE test1 (
`value` STRING
)
LIKE test;
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
如果没有指定数据库,则从当前数据库返回表。
LIKE子句中sql pattern的语法与MySQL方言的语法相同:
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
(2)修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
1)创建数据生成器源表
CREATE TABLE source (
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='10',
'fields.ts.kind'='sequence',
'fields.ts.start'='1',
'fields.ts.end'='1000000',
'fields.vc.kind'='random',
'fields.vc.min'='1',
'fields.vc.max'='100'
);
CREATE TABLE sink (
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'print'
);
2)查询源表
select * from source
3)插入sink表并查询
INSERT INTO sink select * from source;
select * from sink;
WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。
1)语法
WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;
<with_item_defintion>:
with_item_name (column_name[, ...n]) AS ( <select_query> )
2)案例
WITH source_with_total AS (
SELECT id, vc+10 AS total
FROM source
)
SELECT id, SUM(total)
FROM source_with_total
GROUP BY id;
1)语法
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
2)案例
SELECT * FROM source
SELECT id, vc + 10 FROM source
-- 自定义 Source 的数据
SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
SELECT vc + 10 FROM source WHERE id >10
用作根据 key 进行数据去重
SELECT DISTINCT vc FROM source
对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。
SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:
select COUNT(*) from source;
而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。
SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;
这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。
分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。
1)group聚合案例
CREATE TABLE source1 (
dim STRING,
user_id BIGINT,
price BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
CREATE TABLE sink1 (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
insert into sink1
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source1
group by
dim,
-- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
2)多维分析
Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (
VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4)
)
-- 供应商id、产品id、评级
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS(
(supplier_id, product_id, rating),
(supplier_id, product_id),
(supplier_id, rating),
(supplier_id),
(product_id, rating),
(product_id),
(rating),
()
);
从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合,在这里简单做个介绍。
直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。
SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。
分组窗口函数 | 描述 |
---|---|
TUMBLE(time_attr, interval) | 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
HOP(time_attr, interval, interval) | 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
SESSION(time_attr, interval) | 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 |
1)准备数据
CREATE TABLE ws (
id INT,
vc INT,
pt AS PROCTIME(), --处理时间
et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.id.min' = '1',
'fields.id.max' = '3',
'fields.vc.min' = '1',
'fields.vc.max' = '100'
);
2)滚动窗口示例(时间属性字段,窗口长度)
select
id,
TUMBLE_START(et, INTERVAL '5' SECOND) wstart,
TUMBLE_END(et, INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, TUMBLE(et, INTERVAL '5' SECOND);
3)滑动窗口(时间属性字段,滑动步长,窗口长度)
select
id,
HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wstart,
HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);
4)会话窗口(时间属性字段,会话间隔)
select
id,
SESSION_START(et, INTERVAL '5' SECOND) wstart,
SESSION_END(et, INTERVAL '5' SECOND) wend,
sum(vc) sumVc
from ws
group by id, SESSION(et, INTERVAL '5' SECOND);
对比GroupWindow,TVF窗口更有效和强大。包括:
FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…)
)
GROUP BY [window_start,][window_end,] --可选
1)滚动窗口
SELECT
window_start,
window_end,
id , SUM(vc)
sumVC
FROM TABLE(
TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, id;
2)滑动窗口
要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)
SELECT window_start, window_end, id , SUM(vc) sumVC
FROM TABLE(
HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, id;
3)累积窗口
累积窗口会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。
其实就是固定窗口间隔内提前触发的的滚动窗口 ,其实就是 Tumble Window + early-fire 的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。
累积窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。
注意: 窗口最大长度 = 累积步长的整数倍
SELECT
window_start,
window_end,
id ,
SUM(vc) sumVC
FROM TABLE(
CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))
GROUP BY window_start, window_end, id;
4)grouping sets多维分析
SELECT
window_start,
window_end,
id ,
SUM(vc) sumVC
FROM TABLE(
TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end,
rollup( (id) )
-- cube( (id) )
-- grouping sets( (id),() )
;
OVER聚合为一系列有序行的每个输入行计算一个聚合值。与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。
可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
1)语法
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
SELECT
id,
et,
vc,
count(vc) OVER (
PARTITION BY id
ORDER BY et
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
) AS cnt
FROM ws
也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,可以多次使用:
SELECT
id,
et,
vc,
count(vc) OVER w AS cnt,
sum(vc) OVER w AS sumVC
FROM ws
WINDOW w AS (
PARTITION BY id
ORDER BY et
RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
)
(2)按照行数聚合
统计每个传感器前5条到现在数据的平均水位
SELECT
id,
et,
vc,
avg(vc) OVER (
PARTITION BY id
ORDER BY et
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS avgVC
FROM ws
也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:
SELECT
id,
et,
vc,
avg(vc) OVER w AS avgVC,
count(vc) OVER w AS cnt
FROM ws
WINDOW w AS (
PARTITION BY id
ORDER BY et
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
)
目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法,是固定写法,特殊支持的over用法。
1)语法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
select
id,
et,
vc,
rownum
from
(
select
id,
et,
vc,
row_number() over(
partition by id
order by vc desc
) as rownum
from ws
)
where rownum<=3;
去重,也即上文介绍到的TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其排序字段一定是时间属性列,可以降序,不能是其他非时间属性的普通列。
在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。可以从webui看出是翻译成哪种算子。
如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。
1)语法
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
2)案例
对每个传感器的水位值去重
select
id,
et,
vc,
rownum
from
(
select
id,
et,
vc,
row_number() over(
partition by id,vc
order by et
) as rownum
from ws
)
where rownum=1;
在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。
在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。
常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。
与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。
Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):
CREATE TABLE ws1 (
id INT,
vc INT,
pt AS PROCTIME(), --处理时间
et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.min' = '3',
'fields.id.max' = '5',
'fields.vc.min' = '1',
'fields.vc.max' = '100'
);
1)等值内联结(INNER Equi-JOIN)
内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。
SELECT *
FROM ws
INNER JOIN ws1
ON ws.id = ws1.id
2)等值外联结(OUTER Equi-JOIN)
与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。
具体用法如下:
SELECT *
FROM ws
LEFT JOIN ws1
ON ws.id = ws1.id
SELECT *
FROM ws
RIGHT JOIN ws1
ON ws.id = ws1.id
SELECT *
FROM ws
FULL OUTER JOIN ws1
ON ws.id = ws.id
这部分知识与标准SQL中是完全一样的。
我们曾经学习过DataStream API中的双流Join,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。
间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:
SELECT *
FROM ws,ws1
WHERE ws.id = ws1. id
AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND
Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。
上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段。
表A
JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名
ON xx.字段=别名.字段
比如维表在mysql,维表join的写法如下:
CREATE TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
'table-name' = 'customers'
);
-- order表每来一条数据,都会去mysql的customers表查找维度数据
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
1)order by
支持 Batch\Streaming,但在实时任务中一般用的非常少。
实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。
SELECT *
FROM ws
ORDER BY et, id desc
2)limit
SELECT *
FROM ws
LIMIT 3
在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
1)UNION 和 UNION ALL
UNION:将集合合并并且去重
UNION ALL:将集合合并,不做去重。
(SELECT id FROM ws) UNION (SELECT id FROM ws1);
(SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);
2)Intersect 和 Intersect All
Intersect:交集并且去重
Intersect ALL:交集不做去重
(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);
(SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);
3)Except 和 Except All
Except:差集并且去重
Except ALL:差集不做去重
(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);
(SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);
上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流
4)In 子查询
In 子查询的结果集只能有一列
SELECT id, vc
FROM ws
WHERE id IN (
SELECT id FROM ws1
)
上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。
查看系统函数的官方文档
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/
系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。
Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。
1)标量函数(Scalar Functions)
标量函数指的就是只对输入数据做转换操作、返回一个值的函数。
标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。
Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。
目前 Flink 包含了以下三种 Module:
-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
-- 查看
SHOW MODULES;
SHOW FULL MODULES;
在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。
此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:
USE MODULE hive,core;
USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。
2)案例
加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。
(1)上传jar包到flink的lib中
上传hive connector
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/
注意:拷贝hadoop的包,解决依赖冲突问题
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/
(2)重启flink集群和sql-client
(3)加载hive module
-- hive-connector内置了hive module,提供了hive自带的系统函数
load module hive with ('hive-version'='3.1.3');
show modules;
show functions;
-- 可以调用hive的split函数
select split('a,b', ',');
12.5.0中的DataGen和Print都是一种connector,其他connector参考官网:链接
CREATE TABLE t1(
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
id int,
ts bigint ,
vc int )
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop103:9092',
'properties.group.id' = 'jjm',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
'scan.startup.mode' = 'earliest-offset',
-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
'topic' = 'ws1',
'format' = 'json'
)
(2)插入Kafka表
insert into t1(id,ts,vc) select * from source
(3)查询Kafka表
select * from t1
CREATE TABLE t2(
id int ,
sumVC int ,
primary key (id) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'hadoop102:9092',
'topic' = 'ws2',
'key.format' = 'json',
'value.format' = 'json'
)
(2)插入upsert-kafka表
insert into t2 select id,sum(vc) sumVC from source group by id
(3)查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
select * from t2
Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。 元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。
Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。
目前 Flink 包含了以下四种 Catalog:
具体用法也可以参考官网 link
cp flink-connector-jdbc-3.1.0-1.17.jar /opt/module/flink-1.17.0/lib/
cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/
SHOW CATALOGS;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
USE CATALOG my_jdbc_catalog;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
官网参考 link
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib
cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/
mv /opt/module/flink-1.17.0/opt/flink-table-planner_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/flink-table-planner_2.12-1.17.0.jar
mv /opt/module/flink-1.17.0/lib/flink-table-planner-loader-1.17.0.jar /opt/module/flink-1.17.0/opt/flink-table-planner-loader-1.17.0.jar
hive.metastore.uris
hive --service metastore &
配置项 | 必需 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
type | Yes | (none) | String | Catalog类型,创建HiveCatalog时必须设置为’hive’。 |
name | Yes | (none) | String | Catalog的唯一名称 |
hive-conf-dir | No | (none) | String | 包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。 |
default-database | No | default | String | Hive Catalog使用的默认数据库 |
hive-version | No | (none) | String | HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。 |
hadoop-conf-dir | No | (none) | String | Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。 |
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/module/hive/conf'
);
SHOW CATALOGS;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
USE CATALOG myhive;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
建表,退出sql-client重进,查看catalog和表还在。
SHOW DATABASES; -- 可以看到hive的数据库
USE test; -- 可以切换到hive的数据库
SHOW TABLES; -- 可以看到hive的表
SELECT * from ws; --可以读取hive表
INSERT INTO ws VALUES(1,1,1); -- 可以写入hive表
CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
'connector' = 'filesystem',
'path' = 'hdfs://hadoop102:8020/data/t3',
'format' = 'csv'
)
insert into t3 select * from source
select * from t3 where id = '1'
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.bak
Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。
在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。
CREATE TABLE `ws2` (
`id` int(11) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE t4
(
id INT,
ts BIGINT,
vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
'username' = 'root',
'password' = '000000',
'connection.max-retry-timeout' = '60s',
'table-name' = 'ws2',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '5s',
'sink.max-retries' = '3',
'sink.parallelism' = '1'
);
select * from t4
insert into t4 select * from source
INSERT INTO sink select * from source;
SHOW JOBS;
SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
STOP JOB '228d70913eab60dda85c5e7f78b5782c' WITH SAVEPOINT;
-- 设置从savepoint恢复的路径
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a';
-- 之后直接提交sql,就会从savepoint恢复
--允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true';
RESET execution.savepoint.path;
如果出现reset没生效的问题,可能是个bug,我们可以退出sql-client,再重新进,不需要重启flink的集群。