本系列包含:
在 Flink 中,一共有四种级别的抽象,而 Flink SQL 作为最上层,是 Flink API 的一等公民。
在标准 SQL 中,SQL 语句包含四种类型
DML
,Data Manipulation Language
):用来定义数据库记录(数据)。DCL
,Data Control Language
):用来定义访问权限和安全级别。DQL
,Data Query Language
):用来查询记录(数据)。DDL
,Data Definition Language
):用来定义数据库对象(库,表,列等)。Flink SQL 包含 DML 数据操作语言、 DDL 数据定义语言, DQL 数据查询语言,不包含 DCL 语言。
🚀 可以参考我的这篇博客:【数据库】DDL、DML、DCL简介
从 1.9.0
版本开始,引入了阿里巴巴的 Blink ,对 FIink TabIe & SQL 模块做了重大的重构,保留了 Flink Planner 的同时,引入了 Blink PIanner,没引入以前,Flink 没考虑流批作业统一,针对流批作业,底层实现两套代码,引入后,基于流批一体理念,重新设计算子,以流为核心,流作业和批作业最终都会被转为 transformation
。
Flink SQL 使用 Apache Calcite
作为解析器和优化器。
Calcite 一种动态数据管理框架,它具备很多典型数据库管理系统的功能 如 SQL 解析、 SQL 校验、 SQL 查询优化、 SQL 生成 以及 数据连接查询 等,但是又省略了一些关键的功能,如 Calcite 并不存储相关的元数据和基本数据,不完全包含相关处理数据的算法等。
Calcite 主要包含以下五个部分:
Parser
)
Validato
)
下面举个例子,详细描述一下 Flink SQL 的处理流程,如下所示:
SET table.sql-dialect=default;
CREATE TABLE log_kafka (
user_id STRING,|
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'property-version' = 'universal',
'topic' = 'test',
'properties.bootstrap.servers'= 'hlink163:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.group.id' = 'flink1'
);
我们写一张 Source 表,来源为 Kafka,当执行 create table log_kafka
之后 Flink SQL 将做如下操作:
Parser.jj
文件,生成一系列的 Java 代码,生成的 Java 代码会 把 SQL 转换成 AST 抽象语法树(即 SqlNode 类型)。SqlToOperationConverter
类,来将 SqlNode 转换为 Operation,Operation 会根据 SQL 语法来执行创建表或者删除表等操作,同时 FlinkPlannerImpl.rel()
方法会将 SqlNode 转换成 RelNode 树,并返回 RelRoot。RBO
)的 HepPlanner
,二是基于代价优化(CBO
)的 VolcanoPlanner
。然后得到优化后的 RelNode, 再基于 Flink 里面的 rules
将优化后的逻辑计划转换成物理计划。transformation
,然后递归遍历各节点,将 DataStreamRelNode 转换成 DataStream,在这期间,会依次递归调用 DataStreamUnion
、DataStreamCalc
、DataStreamScan
类中重写的 translateToPlan
方法。递归调用各节点的 translateToPlan
,实际是利用 CodeGen 元编成 Flink 的各种算子,相当于直接利用 Flink 的 DataSet 或者 DataStream 开发程序。下图为执行流程图:
总结就是:
rel
方法将 SqlNode 变成逻辑计划(RelNodeTree),紧接着对逻辑计划进行优化。Logical_Opt_Rules
(逻辑计划优化)找到最优的执行 Planner,并转换为 Flink Logical RelNode。DataStream_Opt_Rules
(流式计算优化)、DataStream_Deco_Rules
(装饰流式计算优化),将优化后的逻辑计划转换为物理计划。优化规则包含如下:
Table_subquery_rules
Expand_plan_rules
Post_expand_clean_up_rules
Datastream_norm_rules
Logical_Opt_Rules
DataStream_Opt_Rules
DataStream_Deco_Rules
先介绍一下什么是 Operation:在 Flink SQL 中,涉及的 DDL,DML,DQL 操作都是 Operation,在 Flink 内部表示,Operation 可以和 SqlNode 对应起来。
Operation 执行在优化前,执行的函数为 executeQperation
,如下图所示,为执行的所有 Operation。
Flink 社区在 Flink 1.11
版本进行了重大改变,如下图所示:
如下所示为 Flink 与 Hive 进行连接时的执行图:
1.1
新引入了 Hive 方言,所以在 Flink SQL 中可以编写 Hive 语法,即 Hive Dialect。BlinkPlanner 是在 Flink 1.9
版本新引入的机制,Blink 的查询处理器则实现流批作业接口的统一,底层的 API 都是 Transformation。真正实现流 & 批的统一处理,替代原 FlinkPlanner 将流 & 批区分处理的方式。在 1.11
版本后已经默认为 Blink Planner。
重点方法如下:
HiveCatalog 主要是持久化元数据,所以一般的创建类型都包含,如 database
、Table
、View
、Function
、Partition
,还有 is_Generic
字段判断等。
Flink 1.11
版本新增的一大功能是实时数仓,可以实时的将 Kafka 中的数据插入 Hive 中,传统的实时数仓基于 Kafka + Flink Streaming
,定义全流程的流计算作业,有着秒级甚至毫秒的实时性,但实时数仓的一个问题是历史数据只有
3
?
15
3-15
3?15 天,无法在其上做 Ad-hoc
的查询。
针对这个特点,Flink 1.11
版本将 FlieSystemStreaming Sink 重新修改,增加了分区提交和滚动策略机制,让 HiveStreaming Sink 重新使用文件系统流接收器。
Flink 1.11
的 Table / SQL API 中,FileSystemConnector 是靠增强版 StreamingFileSink 组件实现,在源码中名为 StreamingFileWriter。
只有在 Checkpoint 成功时,StreamingFileSink 写入的文件才会由 Pending
状态变成 Finished
状态,从而能够安全地被下游读取。所以,我们一定要打开 Checkpointing
,并设定合理的间隔。
StreamingWrite,从 Kafka 中实时拿到数据,使用分区提交将数据从 Kafka 写入 Hive 表中,并运行批处理查询以读取该数据。
Flink 源码中在对 Hive 进行读取操作时,会经历以下几个步骤:
getDataStream
方法,通过 getDataStream
方法来确定查询匹配的分区信息,然后创建表对应的 InputFormat,然后确定并行度,根据并行度确定 slot
分发 HiveMapredSplitReader 任务。slot
中,Split 会确定读取的内容,基于 Hive 中定义的序列化工具,InputFormat 执行读取反序列化,得到 value
值。reader.next
获取 value
,将其解析成 Row。如下图所示:
首先可以看一下,在实时的将数据存储到 Hive 数仓中,FileSystemConnector 为了与 Flink-Hive 集成的大环境适配,最大的改变就是分区提交,可以看一下左下图,官方文档给出的,分区可以采取 日期 + 小时 的策略,或者 时分秒 的策略。
那如何保证已经写入分区的数据何时才能对下游可见呢? 这就和 触发机制 有关, 触发机制包含 process-time
和 partition-time
以及时延。
partition-time
指的是根据事件时间中提取的分区触发。当 'watermark' > 'partition-time' + 'delay'
,选择 partition-time
的数据才能提交成功,
process-time
指根据系统处理时间触发,当加上时延后,要想让分区进行提交,当 'currentprocessing time' > 'partition creation time' + 'delay'
选择 process-time
的数据可以提交成功。
但选择 process-time
触发机制会有缺陷,就是当数据迟到或者程序失败重启时,数据不能按照事件时间被归入正确分区。所以一般会选择 partition-time
。