spark常见问题解决方法

对于Spark程序优化,应从如下几点进行:
  1. 通过监控CPU、内存、网络、IO、GC、应用指标等数据,切实找到系统的瓶颈点。
  2. 统筹全局,制定相应的解决方案,解决问题的思路是否清晰准确很重要,切勿『头疼医头,脚疼医脚』,应总体考虑把握。
  3. 了解一些技术的背景知识,对于每次优化尽量做得彻底些,多进行总结。
  4. 程序优化通常从Stage/Cache/Partition、资源、内存/GC的优化。

1. spark运行时报错:Shutdown hook called before final status was reported.
解决方法:程序存在错误,将日志down下来查看具体原因!down日志命令:yarn logs -applicationId app_id

2. Spark性能优化的9大问题及其解决方案http://book.51cto.com/art/201409/453045.htm
Spark程序优化所需要关注的几个关键点——最主要的是数据序列化和内存优化

*问题1:reduce task数目不合适
解决方法:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数目的2到3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太少,*任务运行缓慢。

*问题2:shuffle磁盘IO时间长
解决方法:设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能;

*问题3:map|reduce数量大,造成shuffle小文件数目多
解决方法:默认情况下shuffle文件数目为map tasks * reduce tasks. 通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,此时文件数为reduce tasks数目;

*问题4:序列化时间长、结果大
解决方法:Spark默认使.用JDK.自带的ObjectOutputStream,这种方式产生的结果大、CPU处理时间长,可以通过设置spark.serializer为org.apache.spark.serializer.KryoSerializer。另外如果结果已经很大,可以使用广播变量;

*问题5:单条记录消耗大
解决方法:使用mapPartition替换map,mapPartition是对每个Partition进行计算,而map是对partition中的每条记录进行计算;

*问题6: collect输出大量结果时速度慢
解决方式:collect源码中是把所有的结果以一个Array的方式放在内存中,可以直接输出到分布式?文件系统,然后查看文件系统中的内容;

*问题7: 任务执行速度倾斜
解决方式:如果是数据倾斜,一般是partition key取的不好,可以考虑其它的并行处理方式 ,并在中间加上aggregation操作;如果是Worker倾斜,例如在某些worker上的executor执行缓慢,可以通过设置spark.speculation=true 把那些持续慢的节点去掉;

*问题8: 通过多步骤的RDD操作后有很多空任务或者小任务产生
解决方式:使用coalesce或repartition去减少RDD中partition数量;

*问题9:Spark Streaming吞吐量不高
解决方式:可以设置spark.streaming.concurrentJobs

3. intellij idea直接编译spark源码及问题解决:
* http://blog.csdn.net/tanglizhe1105/article/details/50530104
* http://stackoverflow.com/questions/18920334/output-path-is-shared-between-the-same-module-error

1
2
Spark编译:clean package -Dmaven.test.skip=true
参数:-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m

4. import Spark source code into intellj, build Error: not found: type SparkFlumeProtocol and EventBatch
http://stackoverflow.com/questions/33311794/import-spark-source-code-into-intellj-build-error-not-found-type-sparkflumepr

1.1

5. 整理对Spark SQL的理解:http://www.aboutyun.com/thread-8575-1-1.html

6. Spark GBDT实现方式:spark gbdt的实现基于:J.H. Friedman. “Stochastic Gradient Boosting.” 1999.spark。
  1) gbdt使用一般的残差更新方式,利用残差(梯度方向)更新样本数据集,做为下棵树模型训练的样本

1
data = input.map(point => LabeledPoint(-loss.gradient(partialModel, point),point.features))

  2) 建树过程用variance作为split准则, xgboost对目标函数进行变换,每轮建树的过程用gradient和hessian计算gain作为split准则
  3) 算法性能(效率、支持数据量)
  4) 算法结果的一致性

7. UDAF(User- Defined Aggregation Funcation):http://p-x1984.iteye.com/blog/1156392
Hive自定义UDF和聚合函数UDAF:http://computerdragon.blog.51cto.com/6235984/1288567
Hive自定义UDF和聚合函数UDAF:http://www.tuicool.com/articles/mYZ7R3

8. java.lang.NoSuchMethodException: java.util.Set.()
http://mail-archives.apache.org/mod_mbox/hive-user/201307.mbox/%3CCE1CA41C.1176E%25rdm@baynote.com%3E

9. Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:
  * UDF:操作单个数据行,产生单个数据行;
  * UDAF:操作多个数据行,产生一个数据行。
  * UDTF:操作一个数据行,产生多个数据行一个表作为输出。

10. Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现UDTF。
实现用户自定义聚合函数: Sum, Average…… n – 1
  10.1. 必须import org.apache.hadoop.hive.ql.exec.UDAF和org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
  10.2. 函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
  10.3. Evaluator需要实现init、iterate、terminatePartial、merge、terminate这几个函数。
  10.3.1. init函数实现接口UDAFEvaluator的init函数。
  10.3.2. iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
  10.3.3. terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类于hadoop的Combiner。
  10.3.4. merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
  10.3.5. terminate返回最终的聚集函数结果。

11. Apache Zeppelin编译安装:http://www.iteblog.com/archives/1573
Apache Zeppelin installation grunt build error:http://stackoverflow.com/questions/33352309/apache-zeppelin-installation-grunt-build-error?rq=1
解决方案:进入web模块npm install

12. Spark源码编译遇到的问题解决:http://www.tuicool.com/articles/NBVvai
内存不够,这个错误是因为编译的时候内存不够导致的,可以在编译的时候加大内存。

1
2
3
4
5
6
7
8
9
10
[ERROR] PermGen space -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors,re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

13. Exception in thread “main” java.lang.UnsatisfiedLinkError: no jnind4j in java.library.path
I’m using a 64-Bit Java on Windows and still get the no jnind4j in java.library.path error
It may be that you have incompatible DLLs on your PATH. In order to tell DL4J to ignore those you have to add the following as a VM parameter (Run -> Edit Configurations -> VM Options in IntelliJ): -Djava.library.path=""

14. spark2.0本地运行源码报错解决办法:
  14.1. 修改对应pom中的依赖jar包,将scope级别由provided改为compile
  14.2. 运行类之前,去掉make选项;在运行vm设置中增加-Dspark.master=local
  14.3. Win7下运行spark example代码报错:

java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:D:/SourceCode/spark-2.0.0/spark-warehouse`修改SQLConf类中WAREHOUSE_PATH变量,将file:前缀改为file:/或file:///
createWithDefault("file:/${system:user.dir}/spark-warehouse")

  14.4. local模式运行:-Dspark.master=local

15. Spark底层运行原理疑问?
  * SparkSession、SparkContext、SQLContext、HiveContext之间的关系与实现机制?
  * RDD、DataFrame、DataSet之间的区别与联系,实现原理?
  * Spark执行原理?
  * Spark SQL执行原理?
  * Spark中的常见设计模式?
  * Spark主要包括 调度与任务分配、I/O模块、通信控制模块、容错模块、Shuffle模块。
  * Spark 按照 ①应用 application ②作业 job ③ stage ④ task 四个层次进行调度,采用经典的FIFO和FAIR等调度算法。

16. 解决Task not serializable Exception错误
方法1:将RDD中的所有数据通过JDBC连接写入数据库,若使用map函数,可能要为每个元素都创建connection,这样开销很大,如果使用mapPartitions,那么只需要针对每个分区建立connection;mapPartitions处理后返回的是Iterator。
方法2:对未序列化的对象加@transisent引用,在进行网络通信时不对对象中的属性进行序列化

17. 使用LZO过程会发现它有两种压缩编码可以使用,即LzoCodec和LzopCodec,下面说说它们区别:
  17.1. LzoCodec比LzopCodec更快, LzopCodec为了兼容LZOP程序添加了如 bytes signature, header等信息
  17.2. 如果使用 LzoCodec作为Reduce输出,则输出文件扩展名为”.lzo_deflate”,它无法被lzop读取;如果使用LzopCodec作为Reduce输出,则扩展名为”.lzo”,它可以被lzop读取
  17.3. 生成lzo index job的”DistributedLzoIndexer“无法为 LzoCodec,即 “.lzo_deflate”扩展名的文件创建index
  17.4. ”.lzo_deflate“文件无法作为MapReduce输入,”.LZO”文件则可以。
  17.5. 综上所述得出最佳实践:map输出的中间数据使用 LzoCodec,reduce输出使用 LzopCodec

18. JVM线程池发展趋势:http://www.importnew.com/15082.html
对于传统线程池机制,一个强大的替代方案就是基于事件模型。这种基于事件的线程轮询/线程池/线程调度机制在函数式编程中很常见。关于这个概念的一个非常流行的实现是基于actor的系统(译者注:Scala的并发系统),Akka已成为其实际上的标准。(译者注:Akka,一种善于处理进程间通信的框架)

19. 这个函数在func(“11”)调用时候正常,但是在执行func(11)或func(1.1)时候就会报error: type mismatch的错误.
* 针对特定的参数类型, 重载多个func函数,这个不难, 传统JAVA中的思路, 但是需要定义多个函数
* 使用超类型, 比如使用AnyVal,Any;这样的话比较麻烦,需要在函数中针对特定的逻辑做类型转化,从而进一步处理上面两个方法使用的是传统JAVA思路,虽然都可以解决该问题,但是缺点是不够简洁;在充满了语法糖的Scala中,针对类型转换提供了特有的implicit隐式转化的功能;

20. org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
解决方案:这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。

21. Spark ML PipeLine GBT/RF预测时报错,java.util.NoSuchElementException: key not found: 8.0
错误原因:由于GBT/RF模型输入setFeaturesCol,setLabelCol参数列名不一致导致。
解决方案:只保存训练算法模型,不保存PipeLineModel

22. linux删除乱码文件,find . -inum 54263996 -exec rm {} -rf \;

23. org.apache.spark.SparkException: Exception thrown in awaitResult
set "spark.sql.broadcastTimeout" to increase the timeout

24. Caused by: java.lang.RuntimeException: Failed to commit task
Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit
如果你比较了解spark中的stage是如何划分的,这个问题就比较简单了。一个Stage中包含的task过大,一般由于你的transform过程太长,因此driver给executor分发的task就会变的很大。所以解决这个问题我们可以通过拆分stage解决。也就是在执行过程中调用cache.count缓存一些中间数据从而切断过长的stage。

25. Spark Streaming性能调优:https://www.iteblog.com/archives/1333
优化运行时间
| 增加并行度 确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源;
| 减少数据序列化 反序列化的负担 Spark Streaming默认将接受到的数据序列化后存储,以减少内存的使用。但是序列化和反序列话需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的系列化接口可以更高效地使用CPU;
| 设置合理的batch duration(批处理时间间隔) 在Spark Streaming中,Spark会每隔batchDuration间隔时间提交一次job,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的;
| 减少因任务提交和分发所带来的负担 通常情况下,Akka框架能够高效地确保任务及时分发,但是当批处理间隔非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone和Coarse-grained Mesos模式通常会比使用Fine-grained Mesos模式有更小的延迟。
| 缓存需要经常使用的数据 调用rdd.cache()来缓存数据,加快数据处理

优化内存使用
| 控制batch size(批处理间隔内的数据量) Spark Streaming会把批处理间隔内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存中少能容纳这个批处理时间间隔内的所有数据,否则必须增加新的资源以提高集群的处理能力;
| 及时清理不再使用的数据 前面讲到Spark Streaming会将接受的数据全部存储到内部可用内存区域中,因此对于处理过的不再需要的数据应及时清理,以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl【Deprecated】时长来及时清理超时的无用数据,这个参数需要小心设置以免后续操作中所需要的数据被超时错误处理,还可以配置选项spark.streaming.unpersist=true来更智能的持久化(unpersist)RDD,该配置使系统找出那些不需要经常保存的RDD,然后去持久化它们,这样可以减少spark rdd的内存使用,还可以改善垃圾回收行为;
| 观察及适当调整GC策略 GC会影响Job的正常运行,可能延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采用不同的GC策略以进一步减小内存回收对Job运行的影响。
| 设备合理的CPU资源数