Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持 ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如 过滤、聚合、连接 和 转换 等。它还提供了 窗口操作、时间处理 和 复杂事件处理 等功能,以满足流式数据处理的需求。
Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。它是 Flink 最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。
使用 Flink SQL 处理数据的基本步骤:
总而言之,我们可以通过 Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。
Flink Connector 是指 用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:
例如:
还有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或 将处理结果导出到外部系统。
DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。
使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。
以下是一个使用 DataGen 函数的简单示例:
-- 创建输入表
CREATE TABLE input_table (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
);
在上面的示例中,我们使用 DataGen 连接器创建了一个名为 input_table
的输入表。该表包含了 order_number
、price
、buyer
、order_time
四个字段。默认是 Random 随机生成对应类型的数据,生产速率是
10000
10000
10000 条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。
生成的数据样例:
{
"order_number": -6353089831284155505,
"price": 253422671148527900374700392448,
"buyer": {
"first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
"last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
},
"order_time": "2023-09-21 06:22:29.618"
}
{
"order_number": 1102733628546646982,
"price": 628524591222898424803263250432,
"buyer": {
"first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
"last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
},
"order_time": "2023-09-21 06:23:01.69"
}
字段类型 | 数据生成方式 |
---|---|
BOOLEAN | random |
CHAR | random / sequence |
VARCHAR | random / sequence |
STRING | random / sequence |
DECIMAL | random / sequence |
TINYINT | random / sequence |
SMALLINT | random / sequence |
INT | random / sequence |
BIGINT | random / sequence |
FLOAT | random / sequence |
DOUBLE | random / sequence |
DATE | random |
TIME | random |
TIMESTAMP | random |
TIMESTAMP_LTZ | random |
INTERVAL YEAR TO MONTH | random |
INTERVAL DAY TO MONTH | random |
ROW | random |
ARRAY | random |
MAP | random |
MULTISET | random |
属性 | 是否必填 | 默认值 | 类型 |
|
---|---|---|---|---|
connector | required | (none) | String | ‘datagen’ |
rows-per-second | optional | 10000 10000 10000 | Long | 数据生产速率 |
number-of-rows | optional | (none) | Long | 指定生产的数据条数,默认是不限制 |
fields.#.kind | optional | random | String | 指定字段的生产数据的方式 random 还是 sequence |
fields.#.min | optional | (Minimum value of type) | (Type of field) | random 生成器的指定字段 # 最小值,支持数字类型 |
fields.#.max | optional | (Maximum value of type) | (Type of field) | random 生成器的指定字段 # 最大值,支持数字类型 |
fields.#.length | optional | 100 100 100 | Integer | char / varchar / string / array / map / multiset 类型的长度 |
fields.#.start | optional | (none) | (Type of field) | sequence 生成器的开始值 |
fields.#.end | optional | (none) | (Type of field) | sequence 生成器的结束值 |
CREATE TABLE dataGenSourceTable (
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
) WITH (
'connector'='datagen',
'number-of-rows'='100000000',
'rows-per-second' = '100000'
);
CREATE CATALOG myhive
WITH (
'type'='hive',
'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;
当每秒生产 10 万条数据的时候,17 分钟左右就可以完成,当然我们可以通过增加 Flink 任务的计算节点、并行度、提高生产速率 rows-per-second
的值等来更快速的完成大数据量的生产。
CREATE TABLE dataGenSourceTable (
order_number BIGINT,
price INT,
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3),
col_array ARRAY <STRING>,
col_map map <STRING,STRING>
) WITH (
'connector'='datagen', --连接器类型
'rows-per-second'='100000', --生产速率
'fields.order_number.kind'='random', --字段order_number的生产方式
'fields.order_number.min'='1', --字段order_number最小值
'fields.order_number.max'='1000', --字段order_number最大值
'fields.price.kind'='sequence', --字段price的生产方式
'fields.price.start'='1', --字段price开始值
'fields.price.end'='1000', --字段price最大值
'fields.col_array.element.length'='5', --每个元素的长度
'fields.col_map.key.length'='5', --map key的长度
'fields.col_map.value.length'='5' --map value的长度
);
CREATE TABLE jdqsink1 (
order_number BIGINT,
price DECIMAL(32, 2),
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3),
col_ARRAY ARRAY <STRING>,
col_map map <STRING,STRING>
) WITH (
'connector'='jdq',
'topic'='jrdw-fk-area_info__1',
'jdq.client.id'='xxxxx',
'jdq.password'='xxxxxxx',
'jdq.domain'='db.test.group.com',
'format'='json'
);
INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;
通过以上案例可以看到,通过 Datagen 结合其他连接器可以模拟各种场景的数据。
总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。