目录
???????????????分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])
????????分析函数可以大致分成如下3类:
????????1- 第一类: 聚合函数 sum() count() avg() max() min()
????????2- 第二类: row_number() rank() dense_rank() ntile()
????????3- 第三类: first_value() last_value() lead() lag()排序 :
????????row_number: 巧记: 1234 ??特点: 唯一且连续
????????dense_rank: 巧记: 1223 ??特点: 并列且连续
????????rank ??: 巧记: 1224 ??特点: 并列不连续
? ? ? ? ????????UDF: 一对一
? ? ? ? ????????UDAF: 多对一
? ? ? ? ????????UDTF: 一对多
? ? ? ? spark sql原生python只能写udf, 借助pandas等第三方组件就可以写udf和udaf
? ? ? ? ? ? ? ? 第一步:创建python函数
? ? ? ? ? ? ? ? 第二步: 将python函数注册到Spark sql中
????????????????????????????????注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
????????????????????????????????注册方式二(只能DSL): ?udf对象 = F.udf(参数1,参数2)
????????????????????????????????注册方式三(只能DSL): ?语法糖写法 ?@F.udf(returnType=返回值类型) ?放置到对应Python的函数上面
? ? ? ? ? ? ? ? 返回基本类型
? ? ? ? ? ? ? ? 返回复杂类型
? ? ? ? ????????Arrow是一种内存中的列式数据格式,提升跨语言数据传输速度,提升大数据分析项目的运行效率 ?
????????????????基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。
????????????????Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型
?????????????????UDAF对自定义Python函数的要求: 输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型
? ? ? ? spark on hive的目的就是替换hive中的hive server 2服务
启动 hadoop服务:
? ? ? ? start-all.sh
hive开启metastore服务命令:
????????cd /export/server/hive/bin
????????nohup ./hive --service metastore &
启动Spark-sql的命令:
????????cd /export/server/spark
? ? ? ? ./spark-sql
????????bin/spark-sql --master local ?--executor-memory 512m --total-executor-cores 1?
启动hive的命令:
????????cd /export/server/hive/bin
????????./hive
启动Spark 提供的分布式的执行引擎:spark的Thrift服务项命令
如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务
cd /export/server/spark/sbin
./start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1 \
--hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse \
--master local[2]?
?
1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树
2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(未优化的)逻辑计划
3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪
?? ?3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理的数据量,提升效率。
?? ?3.2- 列值裁剪: 将一张表中与数据分析不相关的字段不加载进来,只加载数据分析用到的字段。减少后续处理的数据量,提升效率。
?? ?
4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划
5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码
6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。