• Hadoop HAR文件的读取操作

    概述 Hadoop Archive是Hadoop官方提供的解决HDFS上小文件过多的一种方案。可以通过如下命令来执行生成har文件: 执行archive命令会提交一个MapReduce任务来生成har文件。在了解har文件结构后也可以考虑本地生成har文件再上传。 关于“hadoop archive”指令的更多细节请参考官方文档 。 HAR文件结构 HAR文件实际上是一个以”.har”结尾命名的目录,其中至少包含三个文件: _index _masterindex part-00000 … 其中“_index”文件中存储包内目录、文件的元数据信息,并按路径java字符串hashCode()运算的哈希值排序。 “_masterindex”记录了“_index”文件中每1000条文件元数据信息的起止哈希值、以及其在“_index”文件中的起止位置。 “part-X”文件中直接拼接了原始文件内容,无压缩处理,每个map操作的节点生成一个“part-X”文件。 HAR文件读取 在平时工作中读取hdfs文件有三种形式:即在java代码中通过hadoop-client形式读取,执行spark任务读取,通过webhdfs的rest api读取。下面一一介绍下相关的的解决方案。 通过java-client方式读取 假设我们有一个har归档文件,其存储目录是:/har/hp2.har。 在这个归档文件下存在一个目录hp,存储了7个txt文件: 下面通过hadoop-client编写程序列出“har:///har/hp2.har/hp/”下的全部文件: 上面是用scala代码做的实现。 比照普通的HDFS文件的访问方式,访问Har文件的主要特点在于其FileSystem操作对象是一个HarFileSystem的实例。 对HarFileSystem实例执行initialize()操作的时候需要传入要访问的har文件的根路径,其后所有的操作都是对har子项的相对路径进行操作。 注意:执行initialize()操作时只能传入har文件的根路径,不能像执行上面的“hfs -ls har:///har/hp2.har/hp/”指令一样传入一个完整的har子项的路径。 通过spark任务读取 spark读取数据文件大致有三种形式: 通过SparkSession.read.textFile读取文件创建DataSet 通过SparkSession.read.text读取文件创建DataFrame 通过SparkContext.textFile读取文件创建RDD 针对这三种形式,我分别写了一段代码进行验证: 在执行日志中输出的验证信息如下: 根据执行结果可知,通过SparkSession读取的两种方案均不可行,只有通过SparkContext进行读取才能达到预期效果。 通过webhdfs方式读取 关于直接通过webhdfs方式读取har中内容的方式,我查了些资料,包括Hadoop官方文档,StackOverflow的相关话题以及Google检索出的条目,其中勉强可行的是如下两篇文章建议的方案: Downloading a file inside a hadoop archive using Apache Knox Hadoop WebHDFS usage in combination with HAR (hadoop archive) from PHP 其实现思路大致如下: 通过webhdfs获取har的index文件 在index文件中找到在HDFS中存储目标文件的数据文件,以及目标文件在数据文件中的起始offset及长度 通过webhdfs获取目标文件:http://<hadoop-server>:50070/webhdfs/v1/data-file-path?op=OPEN&offset=$offset&length=$len 扫描har index文件并截取内容不太像是一种优雅的做法,至少我并不喜欢。 简单介绍一种替代方案,即通过hadoop-client实现数据流拷贝,下面是代码实现: 在api接口中调用copy方法即可实现下载功能。代码稍稍有些多,但窃以为还是要比扫描index好一些。

    [阅读更多...]
  • Spark数据导出任务内存优化记录

    前两天又接了一个Spark任务,倒不复杂,依然是检索HDFS上的日志数据这样的事情。不过瞅着组内跑着十几二十个任务内存一共只有160来G的yarn集群,有些欲哭无泪。 事情还是要做的,反正执行时间要求不太严格,只能想办法尽量压缩内存的占用了。 先说下背景:现在使用的yarn集群由8个容器组成,每个容器的内存大概20G;工作内容是检索数据,源数据大概1T左右,取出来的目标结果数据在2~8G这样子。 最开始的时候查询任务是直接使用sparkSql来完成。随着数据量的上升很快就遇到了最经典的两个问题:StackOverflowError和OutOfMemoryError。 对于栈溢出,之前设计了几个解决方案,在历史文章里面有记录《Spark StackOverflowError》。其中我使用了任务内多批次执行的方案。现在想来,这其实并不是最好的解决方案:问题在于分批越多,每批任务中的action算子就会导致任务的执行时间越长,远不如直接增加栈空间来得简单直接。不过也算是错有错着,这反倒为后来的优化打下了基础。 至于堆内存溢出,主要发生在将每个partition的数据合并压缩的阶段:.repartition(1).saveAsTextFile(pathSave, classOf[GzipCodec])。因为这个操作可能会发生在每个Executor上,所以只好通过简单的增加Executor的内存来解决问题。因为内存总量有限,单个Executor的内存调大了,就只能将task的并发度调小。这样在更严重的问题暴露之前,一直尝试解决的问题就是如何在并发度和内存占用之间取得平衡。 更严重的问题出现在这次的需求上:很简单,要导出的结果数据集变得非常大了,一般都会大于8G,此时堆内存溢出频繁出现。应对方案如下:取消压缩操作、增大Executor执行内存,将Executor的数量调整为2,每个Executor的task数目调整为1。这样Spark任务可以正常执行了,但是因为并行度太小的缘故,执行时间巨长——动辄跑上十来个小时。优化执行速度又提到了时间表上。 是一次执行错误给了优化的方向。现在任务的执行步骤为: 某次任务执行到第4步的时候报错了,考虑到耗时的问题,就重新写了一段代码来完成4和5两步的操作。此时想到这个任务在不同的阶段对资源的需求是不一样的: 在执行1~3这几个步骤的时候对内存的需求没那么强,但是如果稍稍增加些并行度就能极大地提升任务的执行效率; 第4步则是典型的吃内存的操作,此时并行度为1,但是内存需要足够大才能保证任务顺利完成。 此时方案已经很清晰了:将一个任务拆成两个,一个负责搜集数据,一个负责合并生成的中间数据,在执行的时候按不同的策略分配资源。 至此,当前的任务优化已完成。 再扯些没用的。 最后的优化方案实际上非常简单,以至于我很奇怪为什么一开始没想到。并且这种方案是在Hadoop的计算实践中是最常用的操作。唉,也许是灯下黑吧。 也许直接使用Hadoop会是一个更好地选择。因为瓶颈主要出现在内存上,Hadoop对内存资源的占用会少很多。 如果能不走yarn,直接使用java操作,那么尽量不走yarn好了,虽然复杂度会提升很多,但是执行效率是有保证的。 另外,同事曾经问过我一个问题:如果减少了executor的数目,那么每个executor要处理的数据不就变多了,这样也会造成内存压力。听到这个问题时,我的第一印象是“对啊,之前怎么没有考虑到这一点啊”。后来仔细思索了一段时间,想明白了关键:这个问题的前半句是对的,数据总量固定,并行度降低,单个executor要处理的数据量必然会增加;但是后半句是错的,内存中的数据量取决于partition的数量,在配置中则是和task数量相关。 记录一组spark任务提交参数,留着以后参考: 就这些了。

    [阅读更多...]
  • Spark生成CSV后BOM变EF BF BD

    工作中需要通过Spark以csv格式导出日志文件。实现功能不太复杂,但是在中文识别上遇到了些问题。 解决csv文件中的乱码最直接的思路就是添加BOM,这样Excel在打开Excel的时候就知道使用什么样的编码来解析这篇文档了。 先写了段测试代码验证了下: 这段代码会生成一个包含中文字符的csv文件。添加的BOM提示了生成的文件使用的是UTF-8编码。经测试使用Excel打开该文件可以识别中文字符。 将相同的逻辑写到Spark程序中,结果却没有预期的那么美好:中文乱码依然存在,并且新添加的BOM也变成了乱码。BOM变成了这几个奇怪又熟悉的字符: 最后面那个“潔”还吞了一个英文字符e。 在十六进制下查看BOM变成了这几个字符: 也就是说BOM中的三个字符每个都被替换成了EF BF BD。 查了下资料,这三个字符的意义是这样的: 它是很多编程语言以及库中的备胎,即无效的码点值在编码的时候会默认用这个码点值进行替换,即utf-8中的超级「备胎」(REPLACEMENT CHARACTER) 也就是说Spark将我添加的BOM字符串视为了无效字符。 为什么会这样呢?一时陷入了无效的思索和尝试中。 说下最后的解决方案吧,巨简单:在使用BOM数组创建字符串的时候指明使用UTF8编码。就是像下面这样: 会出现这个问题估计是因为服务器的默认文件编码不是UTF-8导致的。

    [阅读更多...]
  • Spark堆内存溢出解决记录

    最近的工作有很大一部分是在做用户画像。 画像读取的维度bitmap动辄几百MB,甚至存在部分GB级别的。而我们的Yarn集群规模比较小,内存总计只有100多GB。开发调试时遇到最多的问题除了Task not serializable就是heap out of memeory了。 我们使用的Spark版本是1.6.3。yarn集群使用的jdk版本是1.7。 如何解决堆内存溢出是最大的难题 。 这时的分析受惠于这几个jvm虚拟机参数“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ”很多。根据这些参数可以看到内存使用的详情。比如我这里曾遇到过新生代内存使用率达到了99%,而老年代使用率只有20%多的情况,此时需要适当调整新生代和老年代的比例。另外还尝试过使用“-XX:+HeapDumpOnOutOfMemoryError ”获取溢出时的内存快照,如果找到快照的话就可以找出是哪些对象占用了最多的内存,也有很大的几率定位到发生溢出的位置。可惜运维没有找到快照。 另一种情况出现在代码上。比如从ByteWritable获取字节数组时有两个API可用,一个是copyBytes(),一个是getBytes(),使用copyBytes可以把指定区间内的字节数组copy出来,但是这样一来占用的内存肯定是要double了。又比如该使用broadcast变量的时候使用了普通变量,该使用mapPartitions的时候使用了map。写代码的时候每调用一个方法都应该考虑一下相关的开销。 我这里遇到的内存溢出一般来说就是最简单的情况——内存是真的不够用了。某些需要加载到内存里的bitmap体量实在太大,因为整体的资源不足,同时又有其他的任务在跑,所以不能过分上调executor内存。这样能做的只有限制executor的数量,才能适当增加executor的内存。并且为了避免一个executor内可能会出现多个过大bitmap同时并行运算的问题还得限制某些stage的partition数量。更极端的情况就得限制每个executor的core的数量。 大体上就是这样。

    [阅读更多...]
  • Spark Job调优-Part 2

    这一节将主要介绍资源调优,或者说是如何充分利用集群资源。然后再说一下如何对并发度进行优化,这是job性能参数中最难也是最重要的部分。最后我们将了解一些数据自身的表现形式:Spark读取时数据在磁盘上的保存形式(如Apache Avro和 Apache Parquet),以及缓存时或系统内移动时数据在内存中的形式。 优化资源分配 在Spark的用户反馈中经常提到一类问题如“我有一个500个节点的集群,但是在我运行一个应用时,我看到只有两个Task在执行。求帮助!”。考虑到Spark资源控制参数的数量,这种问题不是不可能存在的。在这一节里,我们将学习如何压榨出集群的最后一点资源。根据资群资源管理器的不同(Yarn、Mesos和Spark Standalone),相应的建议和配置也有所不同。不过我们将主要集中在Yarn上——这也是Cloudera推荐使用的资源管理器。 要了解一些Spark on YARN的背景知识,可以先看一下这篇文章。 目前Spark(和Yarn)主要关注的资源只有两个:CPU和内存。当然了,硬盘和网络IO对于Spark的性能也很关键,但是不管Spark还是Yarn,目前都还不能有效地对其进行管理。 在一个Spark应用中,每个Executor都有固定数目的core以及固定大小的堆内存。core的数量可以通过spark-submit提交任务时的参数“–executor-cores”来进行指定,也可以在spark-default.conf配置文件中或SparkConf对象中对“spark.executor.cores”属性进行配置。类似的,堆内存的大小可以通过“—executor-memory”或“spark.executor.memory”来进行设置。cores属性决定了一个executor可以并发执行的task的数量。如“–executor-cores 5”则意味着一个executor在同一时间内最多可以同时执行5个task。内存大小决定了Spark可以缓存的数据的总量以及在group、aggregate或join时可以shuffle地最大数据总量。 “–num-executors”命令参数或“spark.executor.instances”属性则决定了job可以申请的executor总数。从CDH 5.4/Spark 1.3开始,就可以不必设置这个值了。更好的方法是通过“spark.dynamicAllocation.enabled”开启动态分配。通过动态分配,spark应用可以在大量待执行的task积压时申请更多的executor资源,并在空闲时释放executor资源。 同时了解Spark请求的资源是如何与yarn可用资源相配合的也是很重要的。相关的yarn参数包括: yarn.nodemanager.resource.memory-mb:控制了集群每个节点上所有的container可以使用的最大内存总量。 yarn.nodemanager.resource.cpu-vcores:控制了集群每个节点上所有的container可以使用的最大core总量 在提交任务时请求5个executor core就等于向yarn集群请求5个虚拟core。但是向yarn请求内存则有点复杂,情况大致如下: 1. “–executor-memory/spark.executor.memory”控制着executor需要使用的堆内存大小,但是spark也会用到一些堆外内存,比如interned String或者direct byte buffer。“spark.yarn.executor.memoryOverhead”控制着这部分内存,它和其它属性一起决定了每个executor向yarn请求的内存总量。这个属性的默认值是“max(384, 0.07 * spark.executor.memory)” 2. Yarn提供的内存可能会比请求的多一点。Yarn的配置参数“yarn.scheduler.minimum-allocation-mb”和“yarn.scheduler.increment-allocation-mb”控制着分配内存的最小值和增加量。 下图展示了spark和yarn的内存相关属性及配置的层级结构: 对于如何分配executor,如觉得以上内容还不够,那么还可以关注下如下的几个点: 1. application master,是一个非executor container,有向yarn请求container的特殊能力,它本身所占用的资源也需要被考虑进来。在“yarn-client”模式下,它默认会占用1024M内存以及一个vcore。在“yarn-cluster”模式下,application master会运行driver,所以通过“–driver-memory”和“–driver-cores”来配置资源通常是有效的。 2. 给executor分配太多的内存会导致过长的GC延迟。64GB是一个稍显粗略的内存上限建议。 3. 我们已经注意到HDFS客户端在并发线程过多时会存在一些问题,大致每个executor分配5个task就可能耗尽写入带宽,所以给executor分配的core的数目最好低于5。 4. 运行“小型”executor(例如只有一个core且内存仅够运行一个task的executor)实际上是放弃了在一个JVM中并发执行多个Task的优势。比如broadcast变量需要在每个executor上复制一份copy,太多小型executor的话就会产生过多的这种变量copy。 为了能够更具体地描述以上的这些内容,这里以一个Spark应用示例,通过调整这个应用的配置,让其可以充分利用集群的资源:假设一个集群有6个节点在运行NodeManager,每个节点有16个core以及64GB内存。那么每个NodeManager的配置“yarn.nodemanager.resource.memory-mb”和“yarn.nodemanager.resource.cpu-vcores”可以分别设置为63 * 1024 = 64512(MB)和15。这里没有将全部的资源都分配给Yarn container,因为需要留一些资源给每个节点的操作系统以及hadoop后台进程。这里留了1G内存和1个core给这些系统进程。Cloudera Manager会自动计算这些内容并完成相关的Yarn配置。 根据上面说的集群配置,我们极有可能在提交任务时这样配置executor:“–num-executors 6 –executor-cores 15 –executor-memory 63G”。但是,这是一个错误的做法,因为: 6GB+的executor塞不进有63GB内存空间的NodeManager; application master会占用某个节点上的一个core,这意味着那个节点上将不会有足够的core可以分配给一个15-core的executor; 每个executor有15个core将有可能导致HDFS IO吞吐异常。 比较好的做法是这样进行配置:“–num-executors 17 –executor-cores 5 –executor-memory 19G”。为什么呢? 这样每个节点上将会有3个executor——有application master的节点除外,它上面只有两个executor; “–executor-memory”的值是这样算出来的:(63/3) = 21,3是每个节点的executor数量;21 * 0.07 = 1.47,前面提过0.07是堆外内存的比例;21-1.47≈19. 优化并发度 我们都知道Spark是一套并发处理引擎。不过也许我们并不清楚:Spark并不是一个特别“神奇”的并发处理引擎,能够无限制的并行处理业务;它的能力也有极限,我们需要找出最优的并发度。 Spark的每一个stage都有一定数量的task,每个task会串行地处理数据。在优化Spark Job并行度的时候,task数量也许是决定应用性能的唯一的也是最重要的参数。 那么这个数量是如何决定的呢?在之前的文章里有讲述过Spark是如何将RDD划分为不同的stage的。简单提示一下:一些transformation操作如repartition或reduceByKey造成了stage的边界。一个stage中task的个数与stage中最后一个RDD的partition数目相同。一个RDD中partition的数目与其所依赖的RDD的partition数目相同。不过有一些例外情况:coalesce transformation算子创建的RDD的partition总数可以少于其父RDD的partition总数;union transformation算子会创建一个RDD,其partition总数为所有父RDD的全部partitio数目之和;cartesian创建的RDD的partiton总数为所有父RDD的partiton数量的乘积。 如果一个RDD没有父RDD呢?依赖textFile或hadoopFile创建的RDD的partition数量取决于其所使用的MapReduce InputFormat实现。通常情况下一个HDFS block对应着一个partition。对于依赖parallelize方法创建的RDD,其partiton数量取决于用户在调用parallelize方法时传递的参数,如果用户没有传递参数那么会默认采用“spark.default.parallelism”的值。 要想知道一个RDD中partition的个数,可以通过调用rdd.partitions().size()获得。 这里需要关心的是task数量过小的情况。如果task的数量小于可用的slot总数,那么对应的stage将无法充分利用所有可用的CPU。 task数量过少也意味着在执行聚集操作时每个task都会有更大的内存压力。任何“join”、“group”或“*ByKey”操作会将对象保存在hashmap或内存buffer中,以便进行分组或排序。“join”、“group”或“groupByKey”在触发shuffle时会在fetch端使用这些数据结构。“aggregateByKey”、“reduceByKey”在触发shuffle时会在两端使用这些数据结构。 当聚集操作要获取的数据在内存中不容易放下的时候,就会暴露一些问题。第一,在内存中保留太多数据会造成GC压力,容易导致处理中断。第二,如果在内存中放不下这些数据,Spark将会尝试将之溢写到硬盘上,这会产生硬盘IO或排序。过大的shuffle导致的内存开销是我在所有的Cloudera用户中看到的造成Job执行过慢的首要原因。 那么该怎样增加partition的数量呢?如果当前stage是在从hadoop中读取数据,那么可用的做法有: 调用repartition方法,这会产生一次shuffle; 调整InputFormat实现,创建更多的partition; 向HDFS写数据的时候,使用更小的block size。 如果当前stage正在从其他stage接收数据。那么导致stage分界的transformation操作通常会接受一个numPartitions参数,比如: 那么X设置为多少才比较合适呢?最直接的做法就是通过多次实验来决定:看一下父RDD的partition数量,然后每次按这个数量的1.5倍来设置X的值,多次重复,直到性能不再有明显提升。 也有一些计算这个X值的规则,但是不太好验证,因为有些参数不容易计算出来。这里也会介绍下这些规则,但是不建议在工作中使用,这里的介绍只为了能让大家对相关的内容有些深入的了解。使用这些规则的主要目标是在可以充分利用可用内存的前提下,创建足够多的task。 每个task的可用内存等于如下公式:(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores。memory fraction和safty fraction的默认值分别为0.2和0.8. 内存中的shuffle数据的量不太容易确定。最可行的方案是找到stage运行时Shuffle Spill(Memory)和Shuffle Spill(Disk)的比例。然后用这个比例乘以shuffle写数据的总量。不过在这个stage执行reduce操作时情况会有点儿复杂: 此时可以把partition的数量稍稍调大一点儿——一般来说,partition的数量通常是多点儿比少点儿好。 实际上,在设置partition的数量如果有些犹豫不决的话,可以尝试将partiton的数量设置地大一些。这和在编写MapReduce程序时有些不同,MR对task的总数通常比较保守。这是因为MapReduce启动task的开销通常比较大,而Spark则比较小。 压缩数据结构 Spark上的数据流由record的形式构成。record通常有两种表现形式:一种表现形式是反序列化后的java对象;一种是序列化后的二进制数组。通常在内存中Spark会使用反序列化后的record,而在硬盘或网络传输时会使用序列化后的record。也有一些工作在尝试将shuffle数据以序列化后的形式保存在内存中。 “spark.serializer”这个属性决定了使用哪种序列化方案使record在这两种表现形式间转换。Kryo serializer,即org.apache.spark.serializer.KryoSerializer,是推荐的选项。但可惜的是它并不是默认的选项。因为在Spark的早期版本中KryoSerializer有些不稳定,而现在Spark又不想打破版本间的兼容性。但是不管如何,Kryo都应该是优先使用的serializer。 record在这两种表现形式间的转换对于Spark的性能有着巨大的影响。因此很有必要检查一下应用运行时在各处使用的数据类型,并尝试挤出一些水分。 数据序列化后的体量过大可能会导致更加频繁的硬盘和网络IO,同时也会减少Spark可以缓存(这里是指在MEMORY_SER storage level下)的record的总量。这里主要的解决方案是将所有的用户自定义类都通过SparkConf#registerKryoClasses API来注册和传递。

    [阅读更多...]
  • Spark Job调优–Part 1

    在开始写spark代码或者翻阅spark文档的时候,会遇到一些诸如“transformation”,“action”和“RDD”这样的术语。了解这些术语对于编写spark代码是至关重要的。 类似的,当写的spark程序执行失败的时候或者尝试通过Spark WebUI来研究提交的应用为什么执行时间过长的时候,可能会遇到另外一些术语,如“job”、“stage”和“task”,了解这些术语对于写出“好”的Spark程序至关重要。对于这里的“好”,我的意思是执行得足够快。要写出高效的Spark程序,了解一些Spark的底层执行模式是非常非常重要的。 在这篇文里,会介绍部分诸如“Spark程序是如何在集群上执行的”这样的基础知识。通过了解Spark的执行模式,读者也会学到一些如何写出高效Spark程序的实用建议。 Spark是如何执行你的程序的 一个Spark应用是由一个Driver进程和一系列分散在集群不同节点上的Executor进程共同组成的。 Driver进程负责所有要执行的工作的上层流程控制。 Executor进程负责执行具体的工作(以Task的形式)以及存储用户选择的要缓存的数据。 在应用运行期间,Driver和Executpr通常都是固定不变的,不过Spark动态资源管理会改变后者。每一个Executor都有多个slot来运行Task,并在其生命周期内并发执行。 将Driver进程和Executor进程部署到集群是由集群管理器(Yarn、Mesos或者单节点Spark)来完成的,但是Driver和Executor是保存在每个Spark应用中的。 在Spark的执行模式中,位于最上层的概念是Job。在一个Spark应用中调用Action算子就会触发开始一个Spark Job来完成这个Action的运算。Spark会检查这个Action算子所依赖的RDD关系图来获取Job的全貌,并形成一个执行计划。该执行计划从最后端的RDD(即不依赖已缓存的数据或其它RDD的RDD)开始,最终生成计算Action结果所需的RDD。 执行计划的一个主要内容是将Job的transformation算子组合为一个或多个stage。一个stage对应着一批执行相同代码的Task集合;每个Task处理不同的数据子集。每个stage中的transformation算子都是不需要shuffle全部数据就可以完成的。 是什么决定了数据是否要被shuffle呢? 回忆一下,每个RDD是由固定数量的partition组成的;每个partition又是由一定数量的record组成的。对于那些由被称为 “窄” transformation(如map或filter)返回的RDD,计算单个partition中的record所需的record都保存在父RDD的单个partition中。每个对象都只会依赖父RDD的单个对象。coalesce之类的操作可以在一个task中处理多个partition中的数据,但是这种transformation依然被视为 “窄” transformation,因为用来计算每个输出record的输入record仍然在有限的partition子集中。 不过,spark也支持诸如groupByKey和reduceByKey这样的宽依赖transformation。在这些宽依赖操作中,用来计算单个partition的record所需的数据可能来自父RDD的多个partiton中。“byKey”操作通过同样的task将拥有相同key的元组最终放到了同一个partition中。为了完成这种操作,spark就需要执行一次shuffle,这样在一个新的stage中,收集分散在集群上的数据,最终创建新的partition集合。 举个例子,可以考虑下如下的代码: 这段代码中执行了一个action算子。这个action算子又依赖于一系列transformation算子。代码中的action算子和transformation算子执行于由一个文本文件生成的RDD上。这段代码会在一个stage中执行完成,因为涉及到的三个transformation操作没有一个需要的数据是来自输入RDD的多个分区中的。 再看一个相反的示例,下面的代码统计了一个文本文件中每个出现次数超过1000次的单词的具体出现次数。 代码中的整个处理过程被分成了三个stage。其中的两个reduceByKey操作可以视为stage的分界点,因为计算reduceByKey的输出需要对数据按key进行重新分区。 下图是一个更复杂的transformation执行图。其中的join操作有着多个依赖。 下图用粉色方框框起来的表示执行过程中的不同stage。 在每个stage的边界,数据会被父stage的task写到硬盘上,而后又会被子stage的task通过网络获取到。因为导致了沉重的硬盘和网络I/O开销,stage边界操作代价是昂贵的,应当尽量避免。父stage的partition数目和子stage的partition数目有可能是不一致的。通常,会触发stage边界的transformation算子都会有一个类似“numPartitions”这样的参数来决定将子stage的数据分为多少个partition。 选择正确的操作 当尝试用spark处理问题时,我们有许多的方式来将action和transformation算子进行组合计算来得到同一个正确的结果。但是并不是所有的这些组合在性能上都是一致的:避免一些常见的陷阱、采用正确的算子组合通常对应用的性能有着显著地影响。接下来介绍的一些规则和经验,可以在必要的时候帮你做出选择。 要从多个可行的算子组合中选出最优方案来,最基本的一个标准是这种算子组合能够减少shuffle的次数和shuffle的数据总量。这是因为shuffle是一个相当耗费资源的操作:所有要shuffle的数据会先写到硬盘上而后再通过网络进行传输。repartition、join、cogroup以及类似“*By”或“*ByKey”这样的transformation算子都会导致shuffle。并非所有的操作都是等价的。Spark开发新手遇到的一些性能陷阱通常都是因为选择了错误的操作导致的。 下面介绍一些应该避免的操作。 在执行关联的规约操作时避免使用groupByKey操作 举个例子,如这段代码: 这段代码执行效果等同于直接使用reduceByKey,如下: 两段代码执行效果虽然相同,但是使用groupByKey的代码会在集群内传递所有的数据集。而后者会先在每个partition内按key进行局部计算,而后再执行shuffle将局部结果集合并起来形成最终结果。 当输入输出值类型不同时避免使用reduceByKey操作 举例如:写一个transformation计算出相同key的字符串集合。一个写法就是使用map将原始的key/value对中的value转为单元素的Set集合,然后再使用reduceByKey将Set集合组合起来: 这段代码会创建太多的不必要的对象,因为需要为每个原始key/value对创建一个Set对象。更好地做法是使用aggregateBykey,这个方法可以更高效地执行map阶段聚合: 避免使用“flatMap-join-groupBy”模式 当两个数据集已经按key做了group,此时想在这两个数据集保持group的状态下进行join操作,可以使用cogroup方法,这样可以避免拆开group又重新执行group造成的开销。 什么时候不会发生shuffle呢 知道在什么样的场景下,上面的transformation操作不会产生shuffle也是很重要的。在之前已经有一个transformation操作使用相同的partitioner完成分区的情况下,spark知道如何避免重复shuffle。看一下下面的代码: 因为没有将partitioner参数传递给reduceByKey方法,所以会使用默认的partitioner,也就是说rdd1和rdd2都会使用HashPartitioner按hash进行分区。这两个reduceByKey操作将会产生两次shuffle。如果这两个RDD有相同数目的partition,那么在执行join操作的时候将不会需要额外的shuffle。因为这两个RDD是按相同规则进行分区的,rdd1的任何一个数据集都只会出现在rdd2的单个partiton中。因此,rdd3的任何一个输出partiton的数据都只会依赖rdd1和rdd2的单个partition的数据,不需要再执行第三次shuffle了。 举个例子,如果前面代码中的“someRdd”有四个partition,“someOtherRdd”有两个partition,这两者执行的reduceByKey操作都会用到三个partition,那么这个任务的执行示意图就大致如下: 如果rdd1和rdd2使用不同的partitioner,又或者它们都仍然使用默认的HashPartitioner却设置了不同的分区数,那么又是怎么执行的呢?在这种情况下,只有一个RDD(partition数目较少的那个RDD)会需要在执行join操作的时候重新shuffle。 一样的transformation操作、一样的输入数据,设置了不同的partition数目时的执行示意图: 对两个数据集执行join操作时,要想避免shuffle,可以考虑利用broadcast变量。当其中一个数据集小到可以放进Executor的执行内存中时,可以先在driver中将这个数据集放进一个hash表中然后在将之广播给每个Executor。之后的map操作就可以参考这个hash表来进行查询了。 什么时候shuffle越多越好 通常我们都需要尽量减少shuffle的数量,但是有一些特殊的场景我们需要反其道而行之。增加shuffle可以增加任务的并行度,这有助于提升性能。假设原始数据来自于几个比较大的不可分割的文件中,那么InputFormat使用的分区方式会将大量的数据放到每个partition中,而不是产生足够多的partition来充分利用可用的core。这种场景下,加载完数据后,执行一次repartition来增加partition的数量(此时会产生shuffle)将有助于之后的操作充分利用集群的CPU。 另外一个场景是在使用reduce或aggregate操作将数据聚合到driver上时。当partition太多时,在Driver上使用单线程将所有partition的数据聚合到一起时,很容易遇到瓶颈。要降低Driver的负载可以考虑先使用reduceByKey或aggregateByKey展开一轮分布式地聚集运算,将原始数据集的partition总数缩小。在将最后的结果发送到driver之前,每个partition上的数据都会互相按key进行合并。具体的,可以看一下treeReduce和treeAggregate这两个方法是如何实现的。 在原始数据已经按key做了分组之后,这个技巧将会特别地有效。假设我们要写一个应用统计一个文本文件内每个单词出现的次数,然后将结果汇总到driver上。一个方法是使用action算子aggregate在本地对每个partition进行运算,然后将将结果汇总到Driver。另一种方式使用aggregateByKey算子,以一种完全分布式的形式进行运算,最后再调用collectAsMap将结果汇总到driver上。 二次排序 另一个需要注意的重要功能是repartitionAndSortWithinPartitions这个transformation算子。这个算子看起来有点儿神秘,但是好像会在各种有趣的场景下出现。它将排序推到了执行shuffle的机器上。在那里,大量的数据被高效地溢写,并且排序可以和其他操作组合在一起使用。 举个例子,Apache Hive在Spark上的join方法实现就使用了这个transformation算子。此外,在二次排序模式中它也是一个重要的组成模块——在这种模式下数据按key进行分组,同时在按key遍历值的时候,key对应的值需要保持一定的顺序。这种模式的使用场景通常为将事件日志按用户进行分组,并将每个用户的事件按发生时间排序后再进行分析。直接使用repartitionAndSortWithinPartitions执行二次排序对用户有点儿难度,不过SPARK-3655大大简化了这个问题。 结论 现在应该能够对涉及到创建一个性能高效的Spark程序的基本因素有足够的了解了。在下一节我们将继续介绍资源请求、并行度和数据结构相关的调优内容。   本文译自:How-to: Tune Your Apache Spark Jobs (Part 1)

    [阅读更多...]
  • spark java.lang.StackOverflowError

    问题描述 在工作中使用spark的一个主要内容就是从多个路径下搜集数据并进行处理。常用的代码大致如下: 在readData方法中调用SparkContext的sequenceFile方法读取文件创建RDD集合。而后调用RDD的reduce和union(即“++”)方法将多个RDD集合进行合并。 这里的代码通常是不会报错的。但是执行合并后的RDD集合的Action算子(这里是collect方法)的时候偶尔会遇到StackOverflowError。异常信息如下: 从异常信息上来看,是在使用java.io.ObjectInputStream执行序列化的时候出现了递归或者死循环,因为栈空间不足导致的这个问题。 继续跟踪调试,发现主要是因为处理的文件过多导致的——一般处理的文件数量超过450(大致数值)以后就会遇到这个问题。查了一些资料,了解到根本原因是RDD Lineage过长:代码中每执行一次RDD.union操作就会增加一次RDD Lineage的步长。 解决问题 现在根据问题的特征和根源,找到的解决思路大致上有这么几个: 增加执行时的栈空间; 避免一次处理过多文件; 定期削减RDD Lineage的长度; 避免创建太长的Lineage。 接下来逐个解释下上面的思路。 增加栈空间 从异常信息中可以看到StatckOverflowError是在Executor中抛出的,所以要调整Executor栈空间,可以在job提交参数中添加如下内容: 这项配置将Executor的栈空间设置为80M。 然后我们测试一下,测试目标是一次处理720个文件。执行结果OK。 再次测试,仍然是80M栈空间,目标一次处理4320个文件。仍然能执行成功。 可知这个方案在一定程度上是可行的,至少可以用来做任务优化。 避免一次处理过多的文件 这个思路是最简单的:既然一次处理太多文件会报错,那么就分成多个批次来处理好了。 调整后的代码如下: 这种方式肯定是可行的,但是用起来多少有点儿麻烦:需要将中间结果集临时存储起来而后再一起使用。中间结果集要是比较小的话还好说,一个变量就足够了;中间结果集要是太大了就得先保存到HDFS上,而后再做二次处理。 削减RDD Lineage的长度 既然问题是因为RDD Lineage长度过长导致的,那么就需要在RDD Lineage变得太长之前,将之削减掉一部分。做法是对合并出的RDD结果集定期做checkpoint,并随意执行一个Action算子。 代码如下: 我一度依赖过这个方案。但是这个方案有一个很大的缺点:就是执行效率太低——比上一种方案效率还低,可以说是执行时间最慢的一种方式了。checkpoint操作实际上是将每个rdd都存储到了硬盘上,其效率可想而知。 避免创建太长的Lineage 前面说的第二种方式也可以说是这种思路的实现。对这个问题来说,推荐得比较多的还是使用SparkContext.union方法来替换RDD.union方法。代码大致如下: 这个方式也是我最初寄望最多的一个方案。 本来希望能通过这个方案一劳永逸地解决这个问题。可是在测试的时候遇到了些问题:这个方案也不能处理路径太多(800个以上)的问题,但是也没有立即报错,而是阻塞住了。执行两三个小时后提示任务执行失败。在日志中可以找到如下错误提示: 对于这个错误提示我目前并无头绪,只能先抛出来给大家看一下。以后如果有进展再继续补充。 就这样。 参考文档: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-td5649.html https://stackoverflow.com/questions/30522564/spark-when-union-a-lot-of-rdd-throws-stack-overflow-error https://stackoverflow.com/questions/38206166/apache-spark-stackoverflowerror-when-trying-to-indexing-string-columns #####

    [阅读更多...]
  • 使用Gradle构建scala多模块工程

    前段时间终于无法忍受sbt慢如龟速的编译打包速度了。稍稍调研了一下,就果断切换到了gradle。由于调研得比较匆忙,在使用过程中遇到了各种问题。好在最后都能解决了。 我这里使用scala主要是用来编写spark job。由于我自己的一些需要,这些job中有几个是多模块的。在这里简单解释一下如何使用gradle构建scala多模块项目。 这里用我最近开发的项目来做说明。项目名称是consumer-portrait-job,有两个子模块:common和compute。 首先在项目根目录下创建一个settings.gradle文件,这个文件主要用来描述项目名称及子模块信息: 然后再创建一个build.gradle文件。这个文件描述了主项目及子项目的一些通用配置。配置如下: 在这个配置文件中包含两个大的模块:allprojects和subprojects。 allprojects中的配置是所有项目共享的(包含根项目)。在这里,我定义了项目的groupId和version等信息,并应用了gradle的idea插件。 subprojects的配置是所有子项目通用的。 在subprojects中的第一行声明了使用gradle的scala插件。 接下来的配置项“sourceCompatibility”声明了编译时使用的jdk版本;“targetCompatibility”确保了编译生成的class与指定版本的jdk兼容。 在ext中声明了子项目中可以使用的一些变量。我这里是声明了scala和spark的版本。 repositories项配置了当前项目可以使用的仓库。这里使用的第一个仓库是本机的maven库,第二库是ali提供的repository服务,第三个库是maven中央库。(曾经研究过如何让gradle和maven公用同一个本地仓库,不过最后也是不了了之)。 dependencies中声明了所有子模块都需要使用的依赖项。这里用到了scala库和spark库,这两个库只会在编译期用到,所以声明使用的依赖类型是compileOnly(这种依赖类型是gradle Java插件独有的,gradle scala插件继承自java插件,所以也可以使用)。 task mkdirs是一个自定义任务。在根项目配置完settings.gradle和build.gradle后,执行“gradle mkdirs”命令完成子模块目录的创建工作。 在两个子模块common和compute下创建build.gradle文件并做配置。 common模块的build.gradle配置详情: 这里只是声明了一下commons模块独有的依赖项。 compute模块是启动模块,在该模块中有spark任务的驱动类。该模块的build.gradle配置详情: 配置中的第一行dependencies仍然是配置compute模块的依赖项。其中略需注意的是对common模块的依赖。 接下来的jar声明指明了将该模块打成的jar包的名称。脚本中需要根据包名来调用模块生成的包,默认生成的包名会带上版本信息,不太合适。 最后是一个自定义任务。该任务的目标是将一些必要的jar和其他文件打成一个zip包,以便于上传任务到执行服务器。任务中的第一个部分是将一些运行时依赖打入zip包中的lib目录,使用include关键字提示包含运行时依赖中指定名称的包,也可以使用exclude关键字排除一些包。第二部分是将生成的jar和本地doc目录中的文件打入zip包的根目录。 就这样。有空再写个示例项目留着参考。 ——————–END———————-

    [阅读更多...]
  • spark使用kafka报NoSuchMethodError

    运行spark任务消费kafka时,报了如下的异常: 使用的spark版本是1.6.1,kafka版本是0.8.2.1。 根据异常信息猜测应该是scala版本导致的问题。 查看了一下依赖的spark和kafka的配置为: 依赖项直接copy自mvnrepository网站。 使用gradle dependencies指令查看了依赖详情:kafka依赖的scala的版本是2.11.5,spark依赖的scala版本是2.11.6。而spark集群服务器部署的scala版本是2.11.8。 照说都是在2.11.*范围内,不应该报错的。 反复测试了几次之后忽然想到spark集群使用的scala版本也许不是2.11.8,也就是说spark集群的scala版本和服务器scala环境的版本也许不一致,我之前的意识是错误的。 需要测试一下,依稀有些印象可以使用2.10.4版本的scala进行编译,所以得先把依赖调整一下: kafka和spark依赖的name后面的数字,如“kafka_2.10”中的2.10指的是采用scala 2.10.*编译打包的。 打包上传后,测试通过,不再报那个异常了。 该如何确认spark集群使用的scala的版本呢。第一个思路是看spark集群lib目录下的依赖是不是会有相关的提示。看了一下比较失望: 就这么几个包,一点儿信息都没有。 不死心,用vi查看了一下。在spark-1.6.1-yarn-shuffle.jar的DEPENDENCIES文件中找到些许证明: 提示了spark-network-common_2.10是用scala 2.10编译的。 又查了一些资料,可以简单进行判断: 如果是spark1,用的scala版本通常是2.10,spark2则通常对应2.11版的scala。 就这样。 ########

    [阅读更多...]
  • 在Spark上通过自定义RDD访问HBase

    这里介绍一个在Spark上使用自定义RDD获取HBase数据的方案。 这个方案的基础是我们的HBase表的行键设计。行键设计大概是这样子的:标签ID+时间戳+随机码。平时的需求主要是导出指定标签在某个时间范围内的全部记录。根据需求和行键设计确定下实现的大方向:使用行键中的时间戳进行partition并界定startRow和stopRow来缩小查询范围,使用HBase API创建RDD获取数据,在获取的数据的基础上使用SparkSQL来执行灵活查询。 创建Partition 这里我们要自定义的RDD主要的功能就是获取源数据,所以需要自定义实现Partition类: 前面我们说过,主要是依赖行键中的时间戳来进行partition的,所以在自定义的QueryPartition类中保留了两个长整型构造参数start和stop来表示起止时间。剩下的构造参数idx作用是标记分区的索引。 这样,自定义RDD中的getPartitions()方法该如何实现也就很清楚了: 在上面代码中的第六行可以看到getPartitions()方法是按每小时一个区间进行partition的。 代码中的unit是一个查询单元,封装了一些必要的查询参数,包括存储数据的表、要查询的标签ID以及起止时间。大致是这样的: 注意,QueryUnit这个类需要实现Serializable接口。 查询HBase 因为要实现灵活查询的需求,所以需要将HBase表中符合需求的数据的所有列都取出来。我们可以考虑使用List[Map[String, String]]这样一种结构来暂时保存数据。根据这个需求实现的自定义HBaseClient的代码如下: 在代码中使用了typesafe的Config类来记录一些信息,比如zookeeper连接信息。 查询HBase这块儿没什么好说的。继续好了。 自定义RDD 前面的两节为自定义实现RDD类做了一些铺垫,包括进行partition的方式,以及一个查询hbase的工具类HBaseClient。实际上我们已经完成实现了自定义RDD的一个抽象方法getPartitions。 自定义RDD需要继承Spark的一个抽象类RDD。 在继承抽象类RDD的同时,需要为它提供两个构造参数,一个是SparkContext实例,一个是父RDD列表。我们要自定义的RDD的是用来获取源数据的,没有父RDD,所以父RDD列表可以直接设置为Nil。 抽象类RDD还有两个抽象方法需要实现,分别是getPartitions()和compute()。getPartitions()方法用来对原始的任务进行分片,可以将原始任务切割成不同的partition,以便进行分布式处理。compute()方法则是实现了对切割出的partition进行处理的逻辑。 getPartitions()方法的实现前面已经提过了,现在看看compute()方法的实现: 可以看到,compute()方法就是在getPartitions()方法创建的时间区间QueryPartition上对HBase中的表进行查询,并将查询出的结果封装成json字符串列表。 编写驱动类 至此,工作已经完成了大半,可以看看驱动类是怎么写的了: 驱动类的任务是在创建的QueryRDD上使用SparkSQL执行查询,并将查询结果保存到HDFS上。 代码中的SparkUtil只是将经常使用的初始化SparkContext以及执行Spark任务的行为封装了一下,是这样实现的: 好了,就这样!!

    [阅读更多...]