RisingWave 社区的朋友们,新年快乐!新年伊始,RisingWave 1.6 版本正式发布!
1.6 版本包含了许多修复和更新,其中有一些属于重大变更,如果您目前正在使用 RisingWave,请尤其注意它们;同时,我们也新增许多功能,包括新连接器、新 SQL 语句等。接下来,请跟我们一起了解本次更新的主要亮点!
RisingWave v1.6 的重大变更共两处,它们将影响当前 SQL 流处理作业的性能,请尤其注意它们:
更正 SOME
、ALL
、ANY
表达式结果
此次更新,我们更正了 SOME
、ALL
、ANY
表达式此前出现的部分错误结果。如果您正在使用任何包含这些表达式的 SQL 查询,请注意,您在此前版本的输出结果可能存在错误。同时,我们也建议先删除包含这些表达式的任何现有物化视图,然后重新创建,以确保结果的准确性。
更改导出到 ClickHouse 的时间戳数据映射
如果您拥有将 RisingWave 的 timestamp
或 timestamptz
数据导出到 ClickHouse 的 sink,请注意,RisingWave 的 timestamptz
数据现在能导出为 ClickHouse 的 DateTime64
数据了。但是,RisingWave 中的 timestamp
数据还不可以直接导出,它必须先转换为 timestamptz
,然后才能导出到 ClickHouse。
接下来是本次更新值得注意的一些增强功能,涉及到导出数据到表、连接器、CDC source、SSL 配置、时间过滤器和新子句等内容。
您可以使用 CREATE SINK ... INTO ...
命令,将 RisingWave 中的 source 和表的数据导出到 RisingWave 的另一张表中。这与创建外部 sink 相同,并且导入的表会根据上游表保持更新。以下是其基本语法:
CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
[FROM sink_from | AS select_query];
对于将不同 source 的数据合并导入到同一张表中,这一命令提供了新方法。使用 CREATE SINK ... INTO ...
和 DROP SINK ...
可以更灵活地增加或删除一张表的数据源,并且当新的流数据进入上游 source 时,表还能保持更新。
例如,我们可以将来自两个不同 Kafka topic 的数据 join 到同一目标表中。在以下示例代码中,我们将 Kafka 两个 source: orders_s0
和 orders_s1
的数据,导入到命名为 orders
的表中。
CREATE TABLE orders (
id int primary key,
price int,
item_id int,
customer_id int
);
CREATE source orders_s0 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_0',
...
) FORMAT PLAIN ENCODE JSON;
CREATE source orders_s1 (
id int primary key,
price int,
item_id int,
customer_id int
) WITH (
connector = 'kafka',
topic = 'topic_1',
...
) FORMAT PLAIN ENCODE JSON;
CREATE SINK orders_sink0 FROM orders_s0 INTO orders;
CREATE SINK orders_sink1 FROM orders_s1 INTO orders;
其中, orders_sink0
sink 把 orders_s0
source 的数据导入 orders
表中;而 orders_sink1
则将 orders_s1
的数据导入到 orders
中。
如果我们不想再把 orders_s0
中的数据导入到 orders
表中,则可以用以下语句删除该 sink:
DROP SINK orders_sink0;
更多细节,请查看:
本月引入了两个新连接器,GCS source 和 StarRocks sink,为您构建流处理生态系统提供了更多灵活性。
GCS source
Google Cloud Storage(谷歌云储存,GCS) 是一种基于云的对象存储服务,它允许您存储和检索数据。与 RisingWave 中的其他 source 连接器一样,您可以在 RisingWave 和 GCS 之间建立连接,使用 CREATE SOURCE
或 CREATE TABLE
命令开始导入数据。
CREATE TABLE gcs(
id int,
name varchar
) WITH (
connector = 'gcs',
match_pattern = '%Ring%*.ndjson',
gcs.bucket_name = 'example-source',
gcs.credential = 'xxxxx',
gcs.service_account = 'xxxxx'
) FORMAT PLAIN ENCODE JSON;
StarRocks sink
StarRocks(原名 Palo)是一款适用于实时分析和交互式查询处理的开源分析数据库。要从 RisingWave 导出数据到 StarRocks,请使用 CREATE SINK
命令。请注意,StarRocks sink 连接器无法导出 struct
和 json
类型数据,因为 StarRocks 的 Stream?Load 不支持它们。
CREATE SINK starrocks_sink
FROM mv WITH (
connector = 'starrocks',
type = 'append-only',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'demo_bhv_table',
force_append_only='true'
);
同时,我们对此前已支持的 Elasticsearch sink 也进行了功能增强。现在,RisingWave 的 Elasticsearch sink 会把 JSONB 数据转为 JSON 字符串,然后 Elasticsearch 会将其转为 instruct。同时,根据 Elasticsearch 版本的不同,其 type
参数设置也有变化。
如果您对某个连接器感兴趣,请查看我们官方文档的 Integrations 页面,了解目前支持的连接器,并为您希望新增支持的连接器投票。更多细节,请查看:
支持对 PostgreSQL 一次连接、多次建表
上个月,RisingWave 新增了对 MySQL 一次连接、多次建表的支持,现在,这种方法也适用于 PostgreSQL,让您能轻松地从同一数据库中不同 schema 的多个表中导出数据,而无需为每个 source 表指定数据库凭据。
您可以使用 CREATE SOURCE
命令与目标数据库建立连接,然后使用 CREATE TABLE
命令从数据库中的单个表摄取数据。只需在建立连接时指定一次数据库凭据,简化流程。
以下 SQL 查询在 RisingWave 中创建了一个 CDC 表,从命名为 public
的 schema 下的 tt3
表导出数据。pg_mydb
是在 RisingWave 中创建的连接到 PostgreSQL 数据库的 source 的名称。
CREATE TABLE tt3 (
v1 integer primary key,
v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';
支持多表事务
现在,对同一 MySQL source 或 PostgreSQL source,我们支持多表事务。要启用此功能,请在创建 CDC source 时在 WITH
选项中设置 transactional = true
。此功能可以自动从上游数据库导入多表事务,因此基于这些 source 表的物化视图将被原子性地更新。例如,以下 SQL 查询创建了一个启用了事务的 PostgreSQL source。
CREATE SOURCE pg_cdc WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot',
transactional = 'true'
);
更多细节,请查看:
此次更新,对于在会话期间更改 SSL 设置,我们为您提供了两种新方法。
我们增加了用于 Kafka source 和 sink 的 properties.ssl.endpoint.identification.algorithm
参数,以绕过 CA 证书的验证。如果您在创建 Kafka source 或 sink 时遇到了“SSL handshake failed”错误,此参数能提供帮助。
我们还引入了 RW_SSL_CERT
和 RW_SSL_KEY
环境变量,分别用于指定前端节点的 SSL 证书和密钥文件位置。
更多细节,请查看:
时间过滤器根据时间相关标准(如时间戳或日期范围)选择或排除数据。在处理时间序列数据或任何时间相关的数据集时,这些过滤器非常重要。RisingWave 此前已经支持时间过滤器,而此次更新引入了更多新功能,支持构建更复杂的时间过滤器。
以前,我们仅支持简单比较,只支持将数据与特定日期或当前时间进行比较,并且 NOW()
不能作为比较上限。
现在,时间过滤器允许使用 NOW()
作为上限条件,并且允许使用 OR
运算符将时间过滤器与普通条件连接。但是,我们还不支持使用 OR
运算符连接多个时间过滤器。
t > NOW() - INTERVAL '1 hour' OR t IS NULL
t + INTERVAL '5' SECOND < NOW()
更多时间过滤器的增强功能正在开发中,敬请期待未来的版本更新!
更多细节,请查看:
SIMILAR TO
子句SIMILAR TO
子句用于确定表达式是否与 SQL 正则表达式中指定的模式匹配,其返回值为 true
或 false
。模式中可以包含元字符。在子句末尾可以指定 ESCAPE
字符,用来以字面意义匹配元字符。同时您也可以使用 NOT
关键字,以确认指定表达式与给定模式不匹配。有关支持的元字符的完整列表,请查看下面提供的文档链接。该子句的基本语法如下:
expression [ NOT ] SIMILAR TO sql_regex_pattern [ ESCAPE <esc_text> ]
以下是 SIMILAR TO
子句的一个示例。我们检查字符串”abc”是否与给定的 SQL 正则表达式匹配。
'abc' SIMILAR TO '(a|b|c)+' -> true
SIMILAR TO
子句能用于寻找特定字符串,从而进行更复杂和细致的查询。您还可以通过检查数据是否与特定模式匹配,如电子邮件、电话号码等,来确保数据是否有效。
更多细节,请查看:
[SIMILAR TO]
pattern matching expressions「SIMILAR TO
模式匹配表达式」以上只是 RisingWave 1.6 版本新增的部分功能,如果您想了解本次更新的完整列表,包括更多 SQL 功能和连接器,请查看更详细的发布说明。
如果您想提前了解下个月的版本及其新功能,请访问 RisingWave GitHub repository。
如果您想了解 RisingWave 的所有动态,请在官网订阅我们的邮件月刊,关注我们的 Twitter?和 LinkedIn。同时,也欢迎您加入我们的 Slack?社区,与我们的工程师还有全球各地的 RisingWave 爱好者交流!
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。
了解更多:
官网:?risingwave.com
GitHub:risingwave.com/github
微信公众号:RisingWave中文开源社区
社区用户交流群:risingwave_assistant