在大环境不好的情况下,本司也开始了“降本增效”,本文探讨一下,在这种背景下 Spark怎么做的降本增效。
Yarn 基于 EMR CPU 是xlarge,也就是内存和核的比例在7:1左右的 ,磁盘是基于 NVMe SSD
Spark 3.5.0(也是刚由3.1 升级而来 )
JDK 8
这里为什么强调 NVMe ,因为相比于 HDD 来说,他的磁盘IO有更高的读写速度。 导致我们在 Spark上 做的一些常规优化是不起效果的
注意:如没特别说明 P99 P95 avg等时间单位是秒
因为我们内部存在于类似 Apache kyuubi这种 long running的服务,而且内存都是20GB起步,所以第一步就想到调整 CMS 策略为 G1 或者 Parallel.
测试结果:
GC类型 | p100 | p99 | p95 | p90 | p50 | avg | task_count |
---|---|---|---|---|---|---|---|
CMS | 9569 | 2085.3800000000047 | 688 | 208 | 3 | 43.85805120688368 | 87363 |
G1 | 9380 | 2164.9100000000035 | 853 | 378 | 6 | 52.22754511406197 | 88110 |
Parallel | 10919 | 2192.970000000001 | 888 | 393 | 6 | 52.06006478898587 | 88904 |
从结果上来看,CMS在 吞吐上比 G1和Parallel 都要好的,附上 JVM的一些关键参数如下
Spark driver端的配置都是一样:
"spark.driver.extraJavaOptions": "-verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump",
CMS的 Executor 配置如下:
"spark.executor.extraJavaOptions": " -verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"
G1的 Executor 配置如下:
"spark.executor.extraJavaOptions": "-verbose:gc -XX:+UseG1GC -XX:ParallelGCThreads=10 -XX:ConcGCThreads=5 -XX:G1ReservePercent=15 -XX:MaxTenuringThreshold=15 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"
Parellel 的 Executor 配置如下:
"spark.executor.extraJavaOptions": "-Dcarbon.properties.filepath=carbon.properties -Dlog4j.configuration=xql-log4j.properties -verbose:gc -XX:++UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=10 -XX:+UseAdaptiveSizePolicy -XX:+ParallelRefProcEnabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"
在对任务的运行时长进行统计的时候我们发现,绝大部分任务都是运行时间在5分钟左右,而与此同时在任务高峰期的时候,整体任务的等待数量一直在1800左右,这种情况下,增大任务的并发度能够很好的增加整体的吞吐,实践中,起码后20%的提速,具体的运行时间如下:
p99 | p95 | p90 | p50 | avg |
---|---|---|---|---|
2391 | 946 | 562 | 38 | 193.19207605335595 |
对于 ESS 可以参考:push-based shuffle to improve shuffle efficiency,但是这里请注意一点,该问题的提出点是基于 HDD 类型的磁盘的,因为我们现在是基于 NVMe SSD 的,所以几乎没有什么提升。
对于 RSS 可以参考:Apache celeborn,结果也是差不多。
对于向量化加速这块建议参考: Gluten,这也是笔者一直在关注的项目,根据 TPC-H 测试结果显示起码有2倍的性能提升,但是实际效果还是得看SQL的pattern。但是由于目前我们的Spark 是基于 3.5.0的,是比较新的版本,而社区这块的融合还在继续,所以这块今年应该可以行动起来,可以参考Add spark35 scala213
众所周知,ZSTD在压缩率和压缩解压缩上的性能都是很突出,但是在 NVMe SSD 光环下,改成了ZSTD的压缩格式以后,效果却不如人意,而且我们还合并了Parallel Compression Support for ZSTD到我们内部的Spark版本中,具体的效果如下:
具体的配置项为:“spark.io.compression.codec”:“zstd”
压缩格式 | p99 | p95 | p90 | p50 | avg | task_count |
---|---|---|---|---|---|---|
lz4 | 2806.5100000000034 | 121 | 60 | 5 | 85.62898089171975 | 628 |
zstd | 1814.3700000000076 | 118.3499999999998 | 78.30000000000001 | 11 | 117.2063492063492 | 378 |
注意: 我们批集群的CPU利用率在60%以上,引入zstd以后会增加CPU的使用率,而且在这种 long running的服务下,得增加driver的core数,要不然会导致在进行广播的时候卡住,具体的可以加如下配置:
"spark.driver.cores":"8",
"spark.io.compression.zstd.workers":"2",
至于效果为什么不理想,主要原因还是因为 shuffle数据量不大,可能对于单个shuffle数据量大的应用来说,可能效果比较明显。
因为对于这种 long running的服务,driver端的内存一般都是挺大的,所以可以适当增大spark.sql.autoBroadcastJoinThreshold的配置,笔者这里设置了 50MB
对于这种long running服务来说,每个时段的任务数肯定是不一样的,那这个时候引入动态资源分配的话,效果就是很好的,具体的配置可以参考如下:
"spark.shuffle.service.enabled":"false"
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "20",
"spark.dynamicAllocation.maxExecutors": "40",
"spark.dynamicAllocation.executorIdleTimeout":"3600"