由于对spark-submit提交后执行流程比较好奇,所以研究了一下spark源码,以下算是阅读笔记吧。
spark-submit启动脚本:
- shell -z判断参数是否为空; $@:表示所有参数;$?:表示上一次程序返回值
- 使用@ 或可以获取数组中的所有元素,例如:${array_name[]},${array_name[@]}
- SparkSubmit–>yarn/Client–>ApplicationMaster
- Client:负责提交作业到Master。
- Master:接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
- Worker:负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,比如启动Driver和Executor。
- shell read可以带有-a, -d, -e, -n, -p, -r, -t, 和 -s八个选项。1234567-a :将内容读入到数值中-d :表示delimiter,即定界符,一般情况下是以IFS为参数的间隔,但是通过-d,可以定义一直读到出现执行的字符位置。-n :用于限定最多可以有多少字符可以作为有效读入。-p :用于给出提示符,在前面的例子中我们使用了echo –n “…“来给出提示符,可以使用read –p ‘… my promt?’value的方式只需一个语句来表示。-r :在参数输入中,我们可以使用’/’表示没有输入完,换行继续输入。-s :对于一些特殊的符号,例如箭头号,不将他们在terminal上打印,我们按光标,在回车之后,如果要求显示,即echo,光标向上,如果不使用-s,在输入时,输入处显示^[[A,即在terminal上打印,之后如果要求echo,光标会上移。-t :用于表示等待输入的时间,单位为秒,等待时间超过,将继续执行后面的脚本,注意不作为null输入,参数将保留原有的值
|
|
- spark-class代码(配置spark任务执行环境变量,提交执行程序):123456789101112131415161718192021222324. "${SPARK_HOME}"/bin/load-spark-env.shbuild_command() {"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"printf "%d\0" $?}//解析参数CMD=()while IFS= read -d '' -r ARG; doCMD+=("$ARG")done < <(build_command "$@")COUNT=${#CMD[@]}LAST=$((COUNT - 1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}if [ $LAUNCHER_EXIT_CODE != 0 ]; thenexit $LAUNCHER_EXIT_CODEfiCMD=("${CMD[@]:0:$LAST}")exec "${CMD[@]}"exec "${CMD[@]}",即执行java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.deploy.SparkSubmit "$@",最终执行代码示例:/software/servers/jdk1.7.0_67/bin/java -cp /software/servers/druid/mart_risk/hadoop/lib/native/:/software/servers/druid/mart_risk/hadoop/share/hadoop/common/lib/hadoop-lzo-0.4.20.jar:/software/conf/druid/mart_risk/bdp_jmart_risk.bdp_jmart_risk_hkh/hive_conf/:/home/mart_risk/data_dir/sjmei/plugins/spark_2.0/conf/:/home/mart_risk/data_dir/sjmei/plugins/spark_2.0/jars/*:/software/conf/druid/mart_risk/bdp_jmart_risk.bdp_jmart_risk_hkh/hadoop_conf/ -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --conf spark.driver.memory=10g --properties-file ./conf/spark-defaults.conf --class com.jd.risk.dm.spark.ml.alphago.GBTMLlib --name spark gbt algo for devices --num-executors 10 --executor-memory 10g --executor-cores 2 --jars ./examples/jars/scopt_2.11-3.3.0.jar --queue bdp_jmart_risk.bdp_jmart_risk_hkh ./libs/jrdm-dm-2.0-SNAPSHOT.jar hdfs://ns2/user/mart_risk/dev.db/risk_jrdm_msj_devices_training_black training
deploy.SparkSubmit代码:
|
|
deploy.yarn.Client代码:
|
|
yarn.client.api.impl.YarnClientImpl代码:
|
|
deploy.yarn.ApplicationMaster代码:
|
|
YarnRMClient
|
|
YarnAllocator
|
|
总结
Yarn-Cluster模式
客户端操作:
- SparkSubmit中根据yarnConf来初始化yarnClient,并启动yarnClient
- 创建客户端Application,并获取Application的ID,进一步判断集群中的资源是否满足executor和ApplicationMaster申请的资源,如果不满足则抛出IllegalArgumentException;
- 设置资源、环境变量:其中包括了设置Application的Staging目录、准备本地资源(jar文件、log4j.properties)、设置Application其中的环境变量、创建Container启动的Context等;
- 设置Application提交的Context,包括设置应用的名字、队列、AM的申请的Container、标记该作业的类型为spark;
- 申请Memory,最终通过submitApplication方法向ResourceManager提交该Application。当作业提交到YARN上之后,客户端就没事了,会关闭此进程,因为整个作业运行在YARN集群上进行,运行的结果将会保存到HDFS或者日志中。
Yarn操作:
- 运行ApplicationMaster的run方法;
- 设置好相关的环境变量;
- 创建amClient,并启动;
- 在Spark UI启动之前设置Spark UI的AmIpFilter;
- 在startUserClass函数专门启动了一个线程(名称为Driver的线程)来启动用户提交的Application,也就是启动了Driver。在Driver中将会初始化SparkContext;
- 等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次数(默认为10),如果等待了的次数超过了配置的,程序将会退出;否则用SparkContext初始化yarnAllocator;
6.1. 怎么知道SparkContext初始化完成?
其实在5步骤中启动Application的过程中会初始化SparkContext,在初始化SparkContext的时候将会创建YarnClusterScheduler,在SparkContext初始化完成的时候,会调用YarnClusterScheduler类中的postStartHook方法,而该方法会通知ApplicationMaster已经初始化好了SparkContext
6.2. 为何要等待SparkContext初始化完成?
CoarseGrainedExecutorBackend启动后需要向CoarseGrainedSchedulerBackend注册 - 当SparkContext初始化完成的时候,通过amClient向ResourceManager注册ApplicationMaster
- 分配并启动Executeors。在启动Executeors之前,先要通过yarnAllocator获取到numExecutors个Container,然后在Container中启动Executeors。如果在启动Executeors的过程中失败的次数达到了maxNumExecutorFailures的次数,那么这个Application将失败,将Application Status标明为FAILED,并将关闭SparkContext。其实,启动Executeors是通过ExecutorRunnable实现的,而ExecutorRunnable内部是启动CoarseGrainedExecutorBackend的,CoarseGrainedExecutorBackend启动后会向SchedulerBackend注册。
(resourceManager是如何决定该分配几个container? 在shell提交时跟参数 默认启动两个executor) - 最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
Client模式:
客户端操作:
- 通过SparkSubmit类的launch的函数直接调用作业的main函数(通过反射机制实现),如果是集群模式就会调用Client的main函数。
- 而应用程序的main函数一定都有个SparkContent,并对其进行初始化;
- 在SparkContent初始化中将会依次做如下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,创建taskScheduler和dagScheduler;其中比较重要的是创建taskScheduler和dagScheduler。在创建taskScheduler的时候会根据我们传进来的master来选择Scheduler和SchedulerBackend。由于我们选择的是yarn-client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend,并将YarnClientSchedulerBackend的实例初始化YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的,YarnClientSchedulerBackend类是CoarseGrainedSchedulerBackend类的子类,YarnClientClusterScheduler是TaskSchedulerImpl的子类,仅仅重写了TaskSchedulerImpl中的getRackForHost方法。
- 初始化完taskScheduler后,将创建dagScheduler,然后通过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程中也会调用SchedulerBackend的start方法。在SchedulerBackend启动的过程中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.submitApplication()方法获取Application ID。
Yarn操作:
- 运行ApplicationMaster的run方法(runExecutorLauncher);
- 无需等待SparkContext初始化完成(因为YarnClientClusterScheduler已启动完成),向sparkYarnAM注册该Application
- 分配Executors,这里面的分配逻辑和yarn-cluster里面类似,就不再说了。
- 最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
- 在作业运行的时候,YarnClientSchedulerBackend会每隔1秒通过client获取到作业的运行状况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
- 最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并停止SparkContext。整个过程就结束了。
Yarn的ApplicationMaster管理
- client向RM提交程序(包含AM程序, AM启动命令,用户程序);
- RM向资源调度器去申请资源,一旦申请的AM需要的资源,AM Laucher 便与对应的NodeManager联系启动
- AM同时向AM LivenessMonitor添加进监控列表,启动对AM的监控
- AM启动后,向AM Service注册报告自己的端口号,ip,track url等,之后AM会定期向AM Service发送心跳,执行allocate,AM Service会向AM LivenessMonitor更新AM的心跳时间
- 当用户程序执行完毕,AM向AM Service报告完成,AM Service通知AM LivenessMonitor从监控列表中删除AM,释放资源。