RisingWave 1.6 发布!新增多种连接器、新 SQL 语句等多个功能特性

发布时间:2024年01月19日

RisingWave 社区的朋友们,新年快乐!新年伊始,RisingWave 1.6 版本正式发布!

1.6 版本包含了许多修复和更新,其中有一些属于重大变更如果您目前正在使用 RisingWave请尤其注意它们;同时,我们也新增许多功能,包括新连接器、新 SQL 语句等。接下来,请跟我们一起了解本次更新的主要亮点!


重大变更

RisingWave v1.6 的重大变更共两处,它们将影响当前 SQL 流处理作业的性能,请尤其注意它们:

  • 更正 SOMEALLANY 表达式结果

    此次更新,我们更正了 SOMEALLANY 表达式此前出现的部分错误结果。如果您正在使用任何包含这些表达式的 SQL 查询,请注意,您在此前版本的输出结果可能存在错误。同时,我们也建议先删除包含这些表达式的任何现有物化视图,然后重新创建,以确保结果的准确性。

  • 更改导出到 ClickHouse 的时间戳数据映射

    如果您拥有将 RisingWave 的 timestamptimestamptz 数据导出到 ClickHouse 的 sink,请注意,RisingWave 的 timestamptz 数据现在能导出为 ClickHouse 的 DateTime64 数据了。但是,RisingWave 中的 timestamp 数据还不可以直接导出,它必须先转换为 timestamptz,然后才能导出到 ClickHouse。


接下来是本次更新值得注意的一些增强功能,涉及到导出数据到表、连接器、CDC source、SSL 配置、时间过滤器和新子句等内容。

导出数据到表

您可以使用 CREATE SINK ... INTO ... 命令,将 RisingWave 中的 source 和表的数据导出到 RisingWave 的另一张表中。这与创建外部 sink 相同,并且导入的表会根据上游表保持更新。以下是其基本语法:

CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
[FROM sink_from | AS select_query];

对于将不同 source 的数据合并导入到同一张表中,这一命令提供了新方法。使用 CREATE SINK ... INTO ...DROP SINK ... 可以更灵活地增加或删除一张表的数据源,并且当新的流数据进入上游 source 时,表还能保持更新。

例如,我们可以将来自两个不同 Kafka topic 的数据 join 到同一目标表中。在以下示例代码中,我们将 Kafka 两个 source: orders_s0orders_s1 的数据,导入到命名为 orders 的表中。

CREATE TABLE orders (
    id int primary key,
    price int,
    item_id int,
    customer_id int
);

CREATE source orders_s0 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH ( 
    connector = 'kafka',
    topic = 'topic_0',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE source orders_s1 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH ( 
    connector = 'kafka',
    topic = 'topic_1',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE SINK orders_sink0 FROM orders_s0 INTO orders;
CREATE SINK orders_sink1 FROM orders_s1 INTO orders;

其中, orders_sink0 sink 把 orders_s0 source 的数据导入 orders 表中;而 orders_sink1 则将 orders_s1 的数据导入到 orders中。

如果我们不想再把 orders_s0 中的数据导入到 orders 表中,则可以用以下语句删除该 sink:

DROP SINK orders_sink0;

更多细节,请查看:


连接器相关

本月引入了两个新连接器,GCS source 和 StarRocks sink,为您构建流处理生态系统提供了更多灵活性。

  • GCS source

    Google Cloud Storage(谷歌云储存,GCS) 是一种基于云的对象存储服务,它允许您存储和检索数据。与 RisingWave 中的其他 source 连接器一样,您可以在 RisingWave 和 GCS 之间建立连接,使用 CREATE SOURCECREATE TABLE 命令开始导入数据。

    CREATE TABLE gcs(
        id int,
        name varchar
    ) WITH (
        connector = 'gcs',
        match_pattern = '%Ring%*.ndjson',
        gcs.bucket_name = 'example-source',
        gcs.credential = 'xxxxx',
        gcs.service_account = 'xxxxx'
    ) FORMAT PLAIN ENCODE JSON;
    
  • StarRocks sink

    StarRocks(原名 Palo)是一款适用于实时分析和交互式查询处理的开源分析数据库。要从 RisingWave 导出数据到 StarRocks,请使用 CREATE SINK 命令。请注意,StarRocks sink 连接器无法导出 structjson 类型数据,因为 StarRocks 的 Stream?Load 不支持它们。

    CREATE SINK starrocks_sink
    FROM mv WITH (
        connector = 'starrocks',
        type = 'append-only',
        starrocks.host = 'starrocks-fe',
        starrocks.mysqlport = '9030',
        starrocks.httpport = '8030',
        starrocks.user = 'users',
        starrocks.password = '123456',
        starrocks.database = 'demo',
        starrocks.table = 'demo_bhv_table',
        force_append_only='true'
    );
    

同时,我们对此前已支持的 Elasticsearch sink 也进行了功能增强。现在,RisingWave 的 Elasticsearch sink 会把 JSONB 数据转为 JSON 字符串,然后 Elasticsearch 会将其转为 instruct。同时,根据 Elasticsearch 版本的不同,其 type 参数设置也有变化。

如果您对某个连接器感兴趣,请查看我们官方文档的 Integrations 页面,了解目前支持的连接器,并为您希望新增支持的连接器投票。更多细节,请查看:


CDC source 相关

  • 支持对 PostgreSQL 一次连接、多次建表

    上个月,RisingWave 新增了对 MySQL 一次连接、多次建表的支持,现在,这种方法也适用于 PostgreSQL,让您能轻松地从同一数据库中不同 schema 的多个表中导出数据,而无需为每个 source 表指定数据库凭据。

    您可以使用 CREATE SOURCE 命令与目标数据库建立连接,然后使用 CREATE TABLE 命令从数据库中的单个表摄取数据。只需在建立连接时指定一次数据库凭据,简化流程。

    以下 SQL 查询在 RisingWave 中创建了一个 CDC 表,从命名为 public 的 schema 下的 tt3 表导出数据。pg_mydb 是在 RisingWave 中创建的连接到 PostgreSQL 数据库的 source 的名称。

    CREATE TABLE tt3 (
        v1 integer primary key,
        v2 timestamp with time zone
    ) FROM pg_mydb TABLE 'public.tt3';
    
  • 支持多表事务

    现在,对同一 MySQL source 或 PostgreSQL source,我们支持多表事务。要启用此功能,请在创建 CDC source 时在 WITH 选项中设置 transactional = true。此功能可以自动从上游数据库导入多表事务,因此基于这些 source 表的物化视图将被原子性地更新。例如,以下 SQL 查询创建了一个启用了事务的 PostgreSQL source。

    CREATE SOURCE pg_cdc WITH (
        connector = 'postgres-cdc',
        hostname = '127.0.0.1',
        port = '8306',
        username = 'root',
        password = '123456',
        database.name = 'mydb',
        slot.name = 'mydb_slot',
    	  transactional = 'true'
    );
    

    更多细节,请查看:


SSL 相关配置

此次更新,对于在会话期间更改 SSL 设置,我们为您提供了两种新方法。

我们增加了用于 Kafka source 和 sink 的 properties.ssl.endpoint.identification.algorithm 参数,以绕过 CA 证书的验证。如果您在创建 Kafka source 或 sink 时遇到了“SSL handshake failed”错误,此参数能提供帮助。

我们还引入了 RW_SSL_CERTRW_SSL_KEY 环境变量,分别用于指定前端节点的 SSL 证书和密钥文件位置。

更多细节,请查看:


时间过滤器

时间过滤器根据时间相关标准(如时间戳或日期范围)选择或排除数据。在处理时间序列数据或任何时间相关的数据集时,这些过滤器非常重要。RisingWave 此前已经支持时间过滤器,而此次更新引入了更多新功能,支持构建更复杂的时间过滤器。

以前,我们仅支持简单比较,只支持将数据与特定日期或当前时间进行比较,并且 NOW() 不能作为比较上限。

现在,时间过滤器允许使用 NOW() 作为上限条件,并且允许使用 OR 运算符将时间过滤器与普通条件连接。但是,我们还不支持使用 OR 运算符连接多个时间过滤器。

t > NOW() - INTERVAL '1 hour' OR t IS NULL
t + INTERVAL '5' SECOND < NOW()

更多时间过滤器的增强功能正在开发中,敬请期待未来的版本更新!

更多细节,请查看:


SIMILAR TO 子句

SIMILAR TO 子句用于确定表达式是否与 SQL 正则表达式中指定的模式匹配,其返回值为 truefalse。模式中可以包含元字符。在子句末尾可以指定 ESCAPE 字符,用来以字面意义匹配元字符。同时您也可以使用 NOT 关键字,以确认指定表达式与给定模式不匹配。有关支持的元字符的完整列表,请查看下面提供的文档链接。该子句的基本语法如下:

expression [ NOT ] SIMILAR TO sql_regex_pattern [ ESCAPE <esc_text> ]

以下是 SIMILAR TO 子句的一个示例。我们检查字符串”abc”是否与给定的 SQL 正则表达式匹配。

'abc' SIMILAR TO '(a|b|c)+' -> true

SIMILAR TO 子句能用于寻找特定字符串,从而进行更复杂和细致的查询。您还可以通过检查数据是否与特定模式匹配,如电子邮件、电话号码等,来确保数据是否有效。

更多细节,请查看:


总结

以上只是 RisingWave 1.6 版本新增的部分功能,如果您想了解本次更新的完整列表,包括更多 SQL 功能和连接器,请查看更详细的发布说明

如果您想提前了解下个月的版本及其新功能,请访问 RisingWave GitHub repository

如果您想了解 RisingWave 的所有动态,请在官网订阅我们的邮件月刊,关注我们的 Twitter?和 LinkedIn。同时,也欢迎您加入我们的 Slack?社区,与我们的工程师还有全球各地的 RisingWave 爱好者交流!


RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

了解更多:

官网:?risingwave.com

教程risingwavetutorial.com

GitHubrisingwave.com/github

微信公众号:RisingWave中文开源社区

社区用户交流群:risingwave_assistant

文章来源:https://blog.csdn.net/weixin_61169526/article/details/135672632
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。