Hadoop的MR结构和YARN结构是大数据时代的第一代产品,满足了大家在离线计算上的需求,但是针对实时运算却存在不足,为满足这一需求,后来的大佬研发了spaRk计算方法,大大的提高了运算效率。
SpaRk的计算原理
spaRk的结构为:
节点介绍:
ClUSteR ManageR:在standalone模式中即为MasteR主节点,控制整个集群,监控woRkeR。在YARN模式中为资源管理器负责分配资源,有点像YARN中ResouRceManageR那个角色,大管家握有所有的干活的资源,属于乙方的总包。 WoRkeRNode:可以干活的节点,听大管家ClUSteRManageR差遣,是真正有资源干活的主。从节点,负责控制计算节点,启动ExecuTor或者DRiveR。 ExecuTor:在WoRkeRNode上起的一个进程,相当于一个包工头,负责准备Task环境和执行。 Task:负责内存和磁盘的使用。Task是施工项目里的每一个具体的任务。 DRiveR:统管Task的产生与发送给ExecuTor的,运行application 的MAIn()函数,是甲方的司令员。 SpaRkcontext:与ClUSteRManageR打交道的,负责给钱申请资源的,是甲方的接口人。
整个互动流程是这样的:
甲方来了个项目,创建了SpaRkcontext,SpaRkcontext去找ClUSteRManageR申请资源同时给出报价,需要多少CPU和内存等资源。ClUSteRManageR去找WoRkeRNode并启动ExcuTor,并介绍ExcuTor给DRiveR认识; DRiveR根据施工图拆分一批批的Task,将Task送给ExecuTor去执行; ExecuTor接收到Task后准备Task运行时依赖并执行,并将执行结果返回给DRiveR; DRiveR会根据返回回来的Task状态不断的指挥下一步工作,直到所有Task执行结束;
运行流程及特点为:
SpaRkcontext的作用:一是分发task,申请资源等功能外,更重要的一个功能是将RDD拆分成task,即绘制DAG图。
借用上图我们再来了解一下spaRk的运算过程:
构建SpaRk application的运行环境,启动SpaRkcontext; SpaRkcontext向资源管理器(可以是Standalone,Mesos,YaRn)申请运行ExecuTor资源,并启动StandaloneExecuTorbackend; ExecuTor向SpaRkcontext申请Task; SpaRkcontext将应用程序分发给ExecuTor; SpaRkcontext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task ScheduleR,最后由Task ScheduleR将Task发送给ExecuTor运行; Task在ExecuTor上运行,运行完释放所有资源;
RDD计算案例
我们用一个案例来分析RDD的计算过程:
在客户端通过RDD构建一个RDD的图形,如图第一部分Rdd1.join(Rdd2).gRoupby(&hellIP;).filteR(&hellIP;)。 spaRkcontext中的DAGScheduleR会将上步的RDD图形构建成DAG图形,如图第二部分; TaskScheduleR会将DAG图形拆分成多个Task; ClUSteRManageR通过YaRn调度器将Task分配到各个node的ExecuteR中,结合相关资源进行运算。
DAGScheduleR对于RDD图形的划分是有一定规律的:
stage的划分是触发action的时候从后往前划分的,所以本图要从RDD_G开始划分。 RDD_G依赖于RDD_B和RDD_F,随机决定先判断哪一个依赖,但是对于结果无影响。 RDD_B与RDD_G属于窄依赖,所以他们属于同一个stage,RDD_B与老爹RDD_A之间是宽依赖的关系,所以他们不能划分在一起,所以RDD_A自己是一个stage1; RDD_F与RDD_G是属于宽依赖,他们不能划分在一起,所以最后一个stage的范围也就限定了,RDD_B和RDD_G组成了Stage3; RDD_F与两个爹RDD_D、RDD_E之间是窄依赖关系,RDD_D与爹RDD_C之间也是窄依赖关系,所以他们都属于同一个stage2; 执行过程中stage1和stage2相互之间没有前后关系所以可以并行执行,相应的每个stage内部各个paRtITion对应的task也并行执行; stage3依赖stage1和stage2执行结果的paRtITion,只有等前两个stage执行结束后才可以启动stage3; 我们前面有介绍过SpaRk的Task有两种:ShuFFleMapTask和ResultTask,其中后者在DAG最后一个阶段推送给ExecuTor,其余所有阶段推送的都是ShuFFleMapTask。在这个案例中stage1和stage2中产生的都是ShuFFleMapTask,在stage3中产生的ResultTask; 虽然stage的划分是从后往前计算划分的,但是依赖逻辑判断等结束后真正创建stage是从前往后的。也就是说如果从stage的ID作为标识的话,先需要执行的stage的ID要小于后需要执行的ID。就本案例来说,stage1和stage2的ID要小于stage3,至于stage1和stage2的ID谁大谁小是随机的,是由前面第2步决定的。
ExecuTor是最终运行task的苦力,他将Task的执行结果反馈给DRiveR,会根据大小采用不同的策略:
如果大于MaxResultSize,默认1G,直接丢弃; 如果“较大&Rdquo;,大于配置的fRaMeSize(默认10M),以taksId为key存入BlockManageR else,全部吐给DRiveR。