问题描述 在工作中使用spark的一个主要内容就是从多个路径下搜集数据并进行处理。常用的代码大致如下: 在readData方法中调用SparkContext的sequenceFile方法读取文件创建RDD集合。而后调用RDD的reduce和union(即“++”)方法将多个RDD集合进行合并。 这里的代码通常是不会报错的。但是执行合并后的RDD集合的Action算子(这里是collect方法)的时候偶尔会遇到StackOverflowError。异常信息如下: 从异常信息上来看,是在使用java.io.ObjectInputStream执行序列化的时候出现了递归或者死循环,因为栈空间不足导致的这个问题。 继续跟踪调试,发现主要是因为处理的文件过多导致的——一般处理的文件数量超过450(大致数值)以后就会遇到这个问题。查了一些资料,了解到根本原因是RDD Lineage过长:代码中每执行一次RDD.union操作就会增加一次RDD Lineage的步长。 解决问题 现在根据问题的特征和根源,找到的解决思路大致上有这么几个: 增加执行时的栈空间; 避免一次处理过多文件; 定期削减RDD Lineage的长度; 避免创建太长的Lineage。 接下来逐个解释下上面的思路。 增加栈空间 从异常信息中可以看到StatckOverflowError是在Executor中抛出的,所以要调整Executor栈空间,可以在job提交参数中添加如下内容: 这项配置将Executor的栈空间设置为80M。 然后我们测试一下,测试目标是一次处理720个文件。执行结果OK。 再次测试,仍然是80M栈空间,目标一次处理4320个文件。仍然能执行成功。 可知这个方案在一定程度上是可行的,至少可以用来做任务优化。 避免一次处理过多的文件 这个思路是最简单的:既然一次处理太多文件会报错,那么就分成多个批次来处理好了。 调整后的代码如下: 这种方式肯定是可行的,但是用起来多少有点儿麻烦:需要将中间结果集临时存储起来而后再一起使用。中间结果集要是比较小的话还好说,一个变量就足够了;中间结果集要是太大了就得先保存到HDFS上,而后再做二次处理。 削减RDD Lineage的长度 既然问题是因为RDD Lineage长度过长导致的,那么就需要在RDD Lineage变得太长之前,将之削减掉一部分。做法是对合并出的RDD结果集定期做checkpoint,并随意执行一个Action算子。 代码如下: 我一度依赖过这个方案。但是这个方案有一个很大的缺点:就是执行效率太低——比上一种方案效率还低,可以说是执行时间最慢的一种方式了。checkpoint操作实际上是将每个rdd都存储到了硬盘上,其效率可想而知。 避免创建太长的Lineage 前面说的第二种方式也可以说是这种思路的实现。对这个问题来说,推荐得比较多的还是使用SparkContext.union方法来替换RDD.union方法。代码大致如下: 这个方式也是我最初寄望最多的一个方案。 本来希望能通过这个方案一劳永逸地解决这个问题。可是在测试的时候遇到了些问题:这个方案也不能处理路径太多(800个以上)的问题,但是也没有立即报错,而是阻塞住了。执行两三个小时后提示任务执行失败。在日志中可以找到如下错误提示: 对于这个错误提示我目前并无头绪,只能先抛出来给大家看一下。以后如果有进展再继续补充。 就这样。 参考文档: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-td5649.html https://stackoverflow.com/questions/30522564/spark-when-union-a-lot-of-rdd-throws-stack-overflow-error https://stackoverflow.com/questions/38206166/apache-spark-stackoverflowerror-when-trying-to-indexing-string-columns #####
[阅读更多...]