互联网技术 / 互联网资讯 · 2024年4月8日 0

Spark性能优化-优化RDD算子

SpaRk调优之RDD算子调优

1. RDD复用

在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算

对上图中的RDD计算架构进行修改,得到如下图所示的优化结果

2. 尽早filteR

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升SpaRk作业的运行效率

3. 读取大量小文件-用wholeTextfiles

当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素

也可以将多个完整的文本文件一次性读取为一个pAIRRDD,其中键是文件名,值是文件内容

如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符

但是这样对于大量的小文件读取效率并不高,应该使用 wholeTextfiles

返回值为RDD[(StRing, StRing)],其中Key是文件的名称,Value是文件的内容

4. MappaRtITion和foReachPaRtITion

MappaRtITions

Map(_&hellIP;.) 表示每一个元素

MappaRtITions(_&hellIP;.) 表示每个分区的数据组成的迭代器

普通的Map算子对RDD中的每一个元素进行操作,而MappaRtITions算子对RDD中每一个分区进行操作

如果是普通的Map算子,假设一个paRtITion有1万条数据,那么Map算子中的function要执行1万次,也就是对每个元素进行操作

如果是MappaRtITion算子,由于一个task处理一个RDD的paRtITion,那么一个task只会执行一次function,function一次接收所有的paRtITion数据,效率比较高

比如,当要把RDD中的所有数据通过JDBC写入数据,如果使用Map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用MappaRtITions算子,那么针对一个分区的数据,只需要建立一个数据库连接

MappaRtITions算子也存在一些缺点:对于普通的Map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用MappaRtITions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出

在项目中,应该首先估算一下RDD的数据量、每个paRtITion的数据量,以及分配给每个ExecuTor的内存资源,如果资源允许,可以考虑使用MappaRtITions算子代替Map

foReachPaRtITion

RRd.foReache(_&hellIP;.) 表示每一个元素

RRd.foRPaRtITions(_&hellIP;.) 表示每个分区的数据组成的迭代器

在生产环境中,通常使用foReachPaRtITion算子来完成数据库的写入,通过foReachPaRtITion算子的特性,可以优化写数据库的性能

如果使用foReach算子完成数据库的操作,由于foReach算子是遍历RDD的每条数据,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foReachPaRtITion算子

与MappaRtITions算子非常相似,foReachPaRtITion是将RDD的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接

使用了foReachPaRtITion 算子后,可以获得以下的性能提升

对于我们写的function函数,一次处理一整个分区的数据; 对于一个分区内的数据,创建唯一的数据库连接; 只需要向数据库发送一次SQL语句和多组参数

在生产环境中,全部都会使用foReachPaRtITion算子完成数据库操作。foReachPaRtITion算子存在一个问题,与MappaRtITions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出

5. filteR+coalesce/RepaRtITion(减少分区)

在SpaRk任务中我们经常会使用filteR算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filteR过滤后,每个分区的数据量有可能会存在较大差异

根据上图我们可以发现两个问题

每个paRtITion的数据量变小了,如果还按照之前与paRtITion相等的task个数去处理当前数据,有点浪费task的计算资源; 每个paRtITion的数据量不一样,会导致后面的每个task处理每个paRtITion数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题

针对上述的两个问题,我们分别进行分析

针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费

那么具体应该如何实现上面的解决思路?我们需要coalesce算子

RepaRtITion与coalesce都可以用来进行重分区,其中RepaRtITion只是coalesce接口中shuFFle为tRue的简易实现,coalesce默认情况下不进行shuFFle,但是可以通过参数进行设置

假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况

1.A > B(多数分区合并为少数分区)

A与B相差值不大

此时使用coalesce即可,无需shuFFle过程

A与B相差值很大

此时可以使用coalesce并且不启用shuFFle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为tRue,即启动shuFFle过程

2.A < B(少数分区分解为多数分区)

此时使用RepaRtITion即可,如果使用coalesce需要将shuFFle设置为tRue,否则coalesce无效

我们可以在filteR操作之后,使用coalesce算子针对每个paRtITion的数据量各不相同的情况,压缩paRtITion的数量,而且让每个paRtITion的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能

注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量

6. 并行度设置

SpaRk作业中的并行度指各个stage的task的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个ExecuTor,每个ExecuTor分配3个CPU coRe,而SpaRk作业有40个task,这样每个ExecuTor分配到的task个数是2个,这就使得每个ExecuTor有一个CPU coRe空闲,导致资源的浪费

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源

SpaRk官方推荐,task数量应该设置为SpaRk作业总CPU coRe数量的2~3倍

SpaRk作业并行度的设置如下

val conf = new SpaRkConf().set(“spaRk.deFAult.paRalleliSM”, “500”)

原则:让 CPu 的 coRe(CPu 核心数) 充分利用起来, 如有100个 coRe,那么并行度可以设置为200~300

7. RepaRtITion/coalesce调节并行度

SpaRk 中虽然可以设置并行度的调节策略,但是,并行度的设置对于SpaRk SQL是不生效的,用户设置的并行度只对于SpaRk SQL以外的所有SpaRk的stage生效

SpaRk SQL的并行度不允许用户自己指定,SpaRk SQL自己会默认根据Hive表对应的HDFS文件的splIT个数自动设置SpaRk SQL所在的那个stage的并行度,用户自己通 spaRk.deFAult.paRalleliSM 参数指定的并行度,只会在没SpaRk SQL的stage中生效

由于SpaRk SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的tRansfoRMation操作有着复杂的业务逻辑,而SpaRk SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有SpaRk SQL的stage速度很慢,而后续的没有SpaRk SQL的stage运行速度非常快

为了解决SpaRk SQL无法设置并行度和task数量的问题,我们可以使用RepaRtITion算子

8. RedUCeByKey本地预聚合

RedUCeByKey相较于普通的shuFFle操作一个显著的特点就是会进行Map端的本地聚合,Map端会先对本地的数据进行coMBIne操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在Map端,对每一个key对应的value,执行RedUCeByKey算子函数

使用RedUCeByKey对性能的提升如下

本地聚合后,在Map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用; 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量; 本地聚合后,在RedUCe端进行数据缓存的内存占用减少; 本地聚合后,在RedUCe端进行聚合的数据量减少

基于RedUCeByKey的本地聚合特征,我们应该考虑使用RedUCeByKey代替其他的shuFFle算子

9. 使用持久化+checkpoint

SpaRk持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失,可以选择对这个RDD进行checkpoint,也就是将数据持久化一份到容错的文件系统上(比如HDFS)

一个RDD缓存并checkpoint后,如果一旦发现缓存丢失,就会优先查看checkpoint数据存不存在,如果有,就会使用checkpoint数据,而不用重新计算。也即是说,checkpoint可以视为cache的保障机制,如果cache失败,就使用checkpoint的数据

使用checkpoint的优点在于提高了SpaRk作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint时需要将数据写入HDFS等文件系统,对性能的消耗较大

持久化设置如下

sc.setCheckpointDiR(‘HDFS&Rsquo;) Rdd.cache/peRsist(MeMoRy_and_disk) Rdd.checkpoint

10. 使用广播变量

默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗

广播变量在每个ExecuTor保存一个副本,此ExecuTor的所有task共用此广播变量,这让变量产生的副本数量大大减少

在初始阶段,广播变量只在DRiveR中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的ExecuTor对应的BlockManageR中尝试获取变量,如果本地没有,BlockManageR就会从DRiveR或者其他节点的BlockManageR上远程拉取变量的复本,并由本地的BlockManageR进行管理;之后此ExecuTor的所有task都会直接从本地的BlockManageR中获取变量

对于多个Task可能会共用的数据可以广播到每个ExecuTor上

11. 使用KRyo序列化

默认情况下,SpaRk使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大

SpaRk官方宣称KRyo序列化机制比Java序列化机制性能提高10倍左右,SpaRk之所以没有默认使用KRyo作为序列化类库,是因为它不支持所有对象的序列化,同时KRyo需要用户在使用前注册需要序