Spark在降本增效中的一些思考

发布时间:2024年01月19日

背景

在大环境不好的情况下,本司也开始了“降本增效”,本文探讨一下,在这种背景下 Spark怎么做的降本增效。
Yarn 基于 EMR CPU 是xlarge,也就是内存和核的比例在7:1左右的 ,磁盘是基于 NVMe SSD
Spark 3.5.0(也是刚由3.1 升级而来 )
JDK 8
这里为什么强调 NVMe ,因为相比于 HDD 来说,他的磁盘IO有更高的读写速度。 导致我们在 Spark上 做的一些常规优化是不起效果的

注意:如没特别说明 P99 P95 avg等时间单位是秒

优化手段

调整JVM GC策略

因为我们内部存在于类似 Apache kyuubi这种 long running的服务,而且内存都是20GB起步,所以第一步就想到调整 CMS 策略为 G1 或者 Parallel.
测试结果:

GC类型p100p99p95p90p50avgtask_count
CMS95692085.3800000000047688208343.8580512068836887363
G193802164.9100000000035853378652.2275451140619788110
Parallel109192192.970000000001888393652.0600647889858788904

从结果上来看,CMS在 吞吐上比 G1和Parallel 都要好的,附上 JVM的一些关键参数如下
Spark driver端的配置都是一样:

"spark.driver.extraJavaOptions": "-verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5  -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump",

CMS的 Executor 配置如下:

"spark.executor.extraJavaOptions": " -verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5  -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

G1的 Executor 配置如下:

"spark.executor.extraJavaOptions": "-verbose:gc -XX:+UseG1GC -XX:ParallelGCThreads=10 -XX:ConcGCThreads=5 -XX:G1ReservePercent=15 -XX:MaxTenuringThreshold=15 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

Parellel 的 Executor 配置如下:

"spark.executor.extraJavaOptions": "-Dcarbon.properties.filepath=carbon.properties -Dlog4j.configuration=xql-log4j.properties -verbose:gc -XX:++UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=10 -XX:+UseAdaptiveSizePolicy -XX:+ParallelRefProcEnabled  -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

增发并发度

在对任务的运行时长进行统计的时候我们发现,绝大部分任务都是运行时间在5分钟左右,而与此同时在任务高峰期的时候,整体任务的等待数量一直在1800左右,这种情况下,增大任务的并发度能够很好的增加整体的吞吐,实践中,起码后20%的提速,具体的运行时间如下:

p99p95p90p50avg
239194656238193.19207605335595

引入ESS或者RSS

对于 ESS 可以参考:push-based shuffle to improve shuffle efficiency,但是这里请注意一点,该问题的提出点是基于 HDD 类型的磁盘的,因为我们现在是基于 NVMe SSD 的,所以几乎没有什么提升。
对于 RSS 可以参考:Apache celeborn,结果也是差不多。

引入向量化插件

对于向量化加速这块建议参考: Gluten,这也是笔者一直在关注的项目,根据 TPC-H 测试结果显示起码有2倍的性能提升,但是实际效果还是得看SQL的pattern。但是由于目前我们的Spark 是基于 3.5.0的,是比较新的版本,而社区这块的融合还在继续,所以这块今年应该可以行动起来,可以参考Add spark35 scala213

设置压缩测试为ZSTD

众所周知,ZSTD在压缩率和压缩解压缩上的性能都是很突出,但是在 NVMe SSD 光环下,改成了ZSTD的压缩格式以后,效果却不如人意,而且我们还合并了Parallel Compression Support for ZSTD到我们内部的Spark版本中,具体的效果如下:
具体的配置项为:“spark.io.compression.codec”:“zstd”

压缩格式p99p95p90p50avgtask_count
lz42806.510000000003412160585.62898089171975628
zstd1814.3700000000076118.349999999999878.3000000000000111117.2063492063492378

注意: 我们批集群的CPU利用率在60%以上,引入zstd以后会增加CPU的使用率,而且在这种 long running的服务下,得增加driver的core数,要不然会导致在进行广播的时候卡住,具体的可以加如下配置:

"spark.driver.cores":"8", 
"spark.io.compression.zstd.workers":"2",

至于效果为什么不理想,主要原因还是因为 shuffle数据量不大,可能对于单个shuffle数据量大的应用来说,可能效果比较明显。

增大可广播join的阈值大小

因为对于这种 long running的服务,driver端的内存一般都是挺大的,所以可以适当增大spark.sql.autoBroadcastJoinThreshold的配置,笔者这里设置了 50MB

引入动态资源分配

对于这种long running服务来说,每个时段的任务数肯定是不一样的,那这个时候引入动态资源分配的话,效果就是很好的,具体的配置可以参考如下:

"spark.shuffle.service.enabled":"false"
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "20",
"spark.dynamicAllocation.maxExecutors": "40",
"spark.dynamicAllocation.executorIdleTimeout":"3600"
文章来源:https://blog.csdn.net/monkeyboy_tech/article/details/135688646
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。