presto/trino 入门介绍实战

发布时间:2024年01月15日

引言

Presto是一款分布式SQL查询引擎,它能够在大规模数据集上实现快速、交互式的查询。本文将介绍Presto的基本概念并结合一些实际的代码示例,能够让的大家快速入门并在实际项目中应用。

官网:Launch Presto: Local download, JDBC, Docker or on AWS Cloud

1. Presto 简介

facebook开源的prestodb是一个分布式的sql引擎,支持多种数据源接入,采用统一的sql语句进行查询。内部实现也类似spark,将这个查询分为分析、优化、阶段划分、执行这些步骤。

  • Presto是由Facebook开发的分布式sql查询引擎,用来进行高速、实时的数据查询
  • Presto的产生是为了解决Hive的MapReduce模型太慢且不能通过BI等工具展现HDFS的问题
  • Presto是一个计算引擎,它不存储数据,通过丰富的connector获取第三方服务的数据,并支持扩展。可以通过连接Hive,来实现快速query hive table
  • 可以跨数据源进行联合查询

2019年,prestodb分化为prestodb和prestosql,prestosql有原始团队维护,现改名为trino。分化近两年后,从生态上看,trino势头明显强过prestodb。例如,下面几点只有在trino中才有:

  • 聚合下推支持
  • join下推支持
  • elasticsearch索引支持通配*

后面的研究都基于trino进行。

查询例子

# 联合查询hive的表和mysql的表
select * from hive.testdb.tableA a join mysql.testdb.tableB b
where a.id = b.id

show catalogs
show schemas

2. Presto 数据模型

Presto 是一款分布式 SQL 查询引擎,其数据模型基于表(Table)和架构(Schema)。Presto 不存储数据,而是通过连接各种数据源进行实时查询。以下是 Presto 的核心数据模型元素:

  • Schema(架构):

    • Schema 是 Presto 中的顶层命名空间,用于组织和隔离表。每个表都属于一个特定的 Schema。
    • 在 Presto 中,Schema 可以看作是一个数据库,不同的是,Presto 的 Schema 通常指向不同的数据源。
  • Table(表):

    • Table 是 Presto 中的数据存储单元。每个表都属于一个特定的 Schema。
    • Presto 支持从各种数据源(如 Hive、MySQL、PostgreSQL 等)中的表执行查询。
  • Column(列):

    • 表中的每一列代表了数据的一个属性。列定义了数据的类型,如整数、字符串、日期等。
    • 查询时,可以引用表中的特定列以检索相应的数据。
  • Row(行):

    • 表中的每一行代表了一条记录。每行中的数据按列排列,形成一个记录的完整集合。
  • Connector(连接器):

    • 连接器是 Presto 中用于连接到不同数据源的插件。每个连接器负责实现 Presto 与特定数据源的交互。
    • Presto 可以同时连接到多个数据源,能够跨越多种类型的数据存储执行查询。
  • Catalog(目录):

    • 目录是 Presto 中用于组织连接器的逻辑单元。每个连接器都注册到一个或多个目录中。
    • 通过目录,Presto 可以了解到底有哪些数据源可以查询。
  • Session(会话):

    • 会话是 Presto 中的查询执行环境。每个查询都在一个独立的会话中执行,会话保持了查询的上下文信息。
    • 在会话中可以设置各种配置选项,如查询超时时间、内存限制等。

Presto的数据模型相当灵活,用户可以通过 SQL 查询语言访问和操作各种数据源中的数据。通过连接器的引入,Presto 可以与不同类型的存储系统协同工作,提供统一的查询接口,使得数据分析变得更加方便和高效。

3. 聚合下推

聚合下推是我们最关心的特性。我们知道sql引擎本质上是在引擎侧对数据进行计算处理的,在大数据条件下,如果所有的数据都在引擎侧计算处理,性能比较差,稳定性也有问题,主要体现在:

  • 大量数据的拉取,对源数据库造成的IO压力和网络开销
  • 大量数据留存在引擎侧进行计算,引擎本身有OOM的风险

一般而已,sql引擎都支持一种要pushdown的优化策略。例如如果用户查询中包含对数据源数据的过滤语义,那么过滤操作可以下放给数据源进行,这个优化称为“过滤下推”。绝大多数sql引擎都支持过滤下推。此外还有projection下推(投影下推)。但是却极少有引擎支持聚合下推。

用户对数据的查询需求,其实往往是聚合分析场景。而一般的sql引擎只能将源数据拉取到引擎中进行聚合计算,区别可能仅仅是单机聚合或者分布式聚合。presto或spark,作为分布式sql引擎,利用MR思想,支持对大量数据进行分布式聚合。

然而,随着数据量的变大,即使是分布式聚合,依然要面临大量数据从数据源拉取的尴尬。我们知道绝大多数的数据库都是支持聚合操作的,而且许多列式数据库、时序数据库聚合查询的性能是极其强悍的。那么作为sql引擎是否能支持将聚合查询也下沉给数据库完成呢?

trino于2020/06发布的版本中声称在数据源接口层支持applyAggregation函数,这意味着数据库如果有能力完成聚合查询,可以实现该函数,提高查询性能,减少数据传输。Release 335 (14 Jun 2020) — Trino 436 Documentation

通过详细调研,trino目前仅有jdbc相关的数据源实现了applyAggregation。为了,验证和深入理解applyAggregation,尝试在elasticsearch数据源上实现聚合pushdown。

最终,实现了term aggregation和min/max/sum/avg/count(x)/count(*),下面是测试的基本功能,可以看到对于40000条记录的index,下推聚合的性能明显高于普通聚合:

The following simple test is based on an index of more than 40000 records.
The difference in query efficiency between the two methods can be figured out.

trino:default> select hostname, avg("values") from elasticsearch.default.slmday60 group by hostname;
hostname | _col1
---------------+-------------------
192.168.21.58 | 4992.663530635401
192.168.21.59 | 4989.727731732876
(2 rows)
Query 20210225_091409_00005_rb8ni, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0.53 [2 rows, 0B] [3 rows/s, 0B/s]
trino:default> set session elasticsearch.aggregation_pushdown_enabled=false;
SET SESSION
trino:default> select hostname, avg("values") from elasticsearch.default.slmday60 group by hostname;
hostname | _col1
---------------+-------------------
192.168.21.58 | 4992.663530635401
192.168.21.59 | 4989.727731732876
(2 rows)
Query 20210225_091431_00007_rb8ni, FINISHED, 1 node
Splits: 50 total, 50 done (100.00%)
2.80 [42.1K rows, 1.68MB] [15.1K rows/s, 617KB/s]] ]></ac:plain-text-body></ac:structured-macro><p>对比聚合下推和非聚合下推情况下的执行计划:</p><p>非聚合下推</p><ac:structured-macro ac:name="code" ac:schema-version="1" ac:macro-id="fd0d9b25-0b25-49a0-861b-ec070471aea2"><ac:plain-text-body><![CDATA[Fragment 0 [SINGLE]                                                                                                 
     Output layout: [hostname, avg]                                                                                  
     Output partitioning: SINGLE []                                                                                  
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                   
     Output[hostname, _col1]                                                                                         
     │   Layout: [hostname:varchar, avg:double]                                                                      
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                     
     │   _col1 := avg                                                                                                
     └─ RemoteSource[1]                                                                                              
            Layout: [hostname:varchar, avg:double]                                                                   
                                                                                                                     
 Fragment 1 [HASH]                                                                                                   
     Output layout: [hostname, avg]                                                                                  
     Output partitioning: SINGLE []                                                                                  
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                   
     Project[]                                                                                                       
     │   Layout: [hostname:varchar, avg:double]                                                                      
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                     
     └─ Aggregate(FINAL)[hostname][$hashvalue]                                                                       
        │   Layout: [hostname:varchar, $hashvalue:bigint, avg:double]                                                
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                  
        │   avg := avg("avg_0")                                                                                      
        └─ LocalExchange[HASH][$hashvalue] ("hostname")                                                              
           │   Layout: [hostname:varchar, avg_0:row(double, bigint), $hashvalue:bigint]                              
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                               
           └─ RemoteSource[2]                                                                                        
                  Layout: [hostname:varchar, avg_0:row(double, bigint), $hashvalue_1:bigint]                         
                                                                                                                     
 Fragment 2 [SOURCE]                                                                                                 
     Output layout: [hostname, avg_0, $hashvalue_2]                                                                  
     Output partitioning: HASH [hostname][$hashvalue_2]                                                              
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                   
     Aggregate(PARTIAL)[hostname][$hashvalue_2]                                                                      
     │   Layout: [hostname:varchar, $hashvalue_2:bigint, avg_0:row(double, bigint)]                                  
     │   avg_0 := avg("values")                                                                                      
     └─ ScanProject[table = elasticsearch:SCAN:slmday60, grouped = false]                                            
            Layout: [hostname:varchar, values:bigint, $hashvalue_2:bigint]          

聚合下推?

 Fragment 0 [SINGLE]                                               
     Output layout: [hostname, _efgnrtd]                           
     Output partitioning: SINGLE []                                
     Stage Execution Strategy: UNGROUPED_EXECUTION                 
     Output[hostname, _col1]                                       
     │   Layout: [hostname:varchar, _efgnrtd:double]               
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}  
     │   _col1 := _efgnrtd                                         
     └─ RemoteSource[1]                                            
            Layout: [hostname:varchar, _efgnrtd:double]            
                                                                   
 Fragment 1 [SOURCE]                                               
     Output layout: [hostname, _efgnrtd]                           
     Output partitioning: SINGLE []                                
     Stage Execution Strategy: UNGROUPED_EXECUTION                 
     TableScan[elasticsearch:AGG:slmday60, grouped = false]        
         Layout: [hostname:varchar, _efgnrtd:double]               
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} 
         hostname := hostname::varchar                             
         _efgnrtd := _efgnrtd_0::double          

多个group by,多个聚合函数都没有问题,也可以支持没有groupby情况下的聚合,例如:

select count(*) 
from elasticsearch.default.slmday60 
where "@timestamp" > TIMESTAMP '2020-04-13' and "@timestamp" < TIMESTAMP '2020-04-13 00:01:00'

count(x):使用value_count(field)聚合

count(*): 使用value_count("_id")聚合

4. 安装与配置

下面是简单安装的步骤,具体安装方式可能有所不同,请参考Presto官方文档Deploying Presto — Presto 0.284 Documentation

# 下载Presto压缩包
wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.267/presto-server-0.267.tar.gz

# 解压
tar -xvf presto-server-0.267.tar.gz

# 进入Presto目录
cd presto-server-0.267

# 配置Presto节点
cp etc/node.properties{.example,}

# 配置连接器(例如Hive)
cp etc/catalog/hive.properties{.example,}

5. 启动 Presto 节点

# 启动Presto服务
bin/launcher start

6. Presto 实战示例

6.1 连接到 Presto

使用Presto CLI连接到Presto服务器:

# 连接到Presto
presto --server localhost:8080 --catalog hive --schema default

6.2 执行 SQL 查询

在Presto CLI中执行简单的SQL查询:

-- 查询Hive中的数据
SELECT * FROM test_db LIMIT 10;

6.3 连接其他数据源

Presto支持多种数据源,如MySQL、PostgreSQL等。首先,需要在etc/catalog目录下配置相应的属性文件。以下是连接MySQL的示例:

# 复制MySQL配置文件
cp etc/catalog/mysql.properties{.example,}

编辑mysql.properties,配置MySQL连接信息:

connector.name=mysql
connection-url=jdbc:mysql://192.168.101.32:3306/test
connection-user=root
connection-password=root123

然后,重新启动Presto节点:

bin/launcher restart

之后,就可以写sql查询不同数据源之间的数据了

# 联合查询hive的表和mysql的表
select * from hive.testdb.tableA a join mysql.testdb.tableB b
where a.id = b.id

show catalogs
show schemas

7. 结语

Presto的强大之处在于它能够无缝连接各种数据源,提供快速、交互式的数据分析能力。在实际项目中,结合Presto的灵活性,可以更方便地处理大规模数据集,加速数据分析和决策过程。希望这篇文章对大家了解和使用Presto有所帮助。

文章来源:https://blog.csdn.net/EverythingAtOnce/article/details/135593695
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。