• Spark数据导出任务内存优化记录

    前两天又接了一个Spark任务,倒不复杂,依然是检索HDFS上的日志数据这样的事情。不过瞅着组内跑着十几二十个任务内存一共只有160来G的yarn集群,有些欲哭无泪。 事情还是要做的,反正执行时间要求不太严格,只能想办法尽量压缩内存的占用了。 先说下背景:现在使用的yarn集群由8个容器组成,每个容器的内存大概20G;工作内容是检索数据,源数据大概1T左右,取出来的目标结果数据在2~8G这样子。 最开始的时候查询任务是直接使用sparkSql来完成。随着数据量的上升很快就遇到了最经典的两个问题:StackOverflowError和OutOfMemoryError。 对于栈溢出,之前设计了几个解决方案,在历史文章里面有记录《Spark StackOverflowError》。其中我使用了任务内多批次执行的方案。现在想来,这其实并不是最好的解决方案:问题在于分批越多,每批任务中的action算子就会导致任务的执行时间越长,远不如直接增加栈空间来得简单直接。不过也算是错有错着,这反倒为后来的优化打下了基础。 至于堆内存溢出,主要发生在将每个partition的数据合并压缩的阶段:.repartition(1).saveAsTextFile(pathSave, classOf[GzipCodec])。因为这个操作可能会发生在每个Executor上,所以只好通过简单的增加Executor的内存来解决问题。因为内存总量有限,单个Executor的内存调大了,就只能将task的并发度调小。这样在更严重的问题暴露之前,一直尝试解决的问题就是如何在并发度和内存占用之间取得平衡。 更严重的问题出现在这次的需求上:很简单,要导出的结果数据集变得非常大了,一般都会大于8G,此时堆内存溢出频繁出现。应对方案如下:取消压缩操作、增大Executor执行内存,将Executor的数量调整为2,每个Executor的task数目调整为1。这样Spark任务可以正常执行了,但是因为并行度太小的缘故,执行时间巨长——动辄跑上十来个小时。优化执行速度又提到了时间表上。 是一次执行错误给了优化的方向。现在任务的执行步骤为: 某次任务执行到第4步的时候报错了,考虑到耗时的问题,就重新写了一段代码来完成4和5两步的操作。此时想到这个任务在不同的阶段对资源的需求是不一样的: 在执行1~3这几个步骤的时候对内存的需求没那么强,但是如果稍稍增加些并行度就能极大地提升任务的执行效率; 第4步则是典型的吃内存的操作,此时并行度为1,但是内存需要足够大才能保证任务顺利完成。 此时方案已经很清晰了:将一个任务拆成两个,一个负责搜集数据,一个负责合并生成的中间数据,在执行的时候按不同的策略分配资源。 至此,当前的任务优化已完成。 再扯些没用的。 最后的优化方案实际上非常简单,以至于我很奇怪为什么一开始没想到。并且这种方案是在Hadoop的计算实践中是最常用的操作。唉,也许是灯下黑吧。 也许直接使用Hadoop会是一个更好地选择。因为瓶颈主要出现在内存上,Hadoop对内存资源的占用会少很多。 如果能不走yarn,直接使用java操作,那么尽量不走yarn好了,虽然复杂度会提升很多,但是执行效率是有保证的。 另外,同事曾经问过我一个问题:如果减少了executor的数目,那么每个executor要处理的数据不就变多了,这样也会造成内存压力。听到这个问题时,我的第一印象是“对啊,之前怎么没有考虑到这一点啊”。后来仔细思索了一段时间,想明白了关键:这个问题的前半句是对的,数据总量固定,并行度降低,单个executor要处理的数据量必然会增加;但是后半句是错的,内存中的数据量取决于partition的数量,在配置中则是和task数量相关。 记录一组spark任务提交参数,留着以后参考: 就这些了。

    [阅读更多...]
  • Spark Job调优-Part 2

    这一节将主要介绍资源调优,或者说是如何充分利用集群资源。然后再说一下如何对并发度进行优化,这是job性能参数中最难也是最重要的部分。最后我们将了解一些数据自身的表现形式:Spark读取时数据在磁盘上的保存形式(如Apache Avro和 Apache Parquet),以及缓存时或系统内移动时数据在内存中的形式。 优化资源分配 在Spark的用户反馈中经常提到一类问题如“我有一个500个节点的集群,但是在我运行一个应用时,我看到只有两个Task在执行。求帮助!”。考虑到Spark资源控制参数的数量,这种问题不是不可能存在的。在这一节里,我们将学习如何压榨出集群的最后一点资源。根据资群资源管理器的不同(Yarn、Mesos和Spark Standalone),相应的建议和配置也有所不同。不过我们将主要集中在Yarn上——这也是Cloudera推荐使用的资源管理器。 要了解一些Spark on YARN的背景知识,可以先看一下这篇文章。 目前Spark(和Yarn)主要关注的资源只有两个:CPU和内存。当然了,硬盘和网络IO对于Spark的性能也很关键,但是不管Spark还是Yarn,目前都还不能有效地对其进行管理。 在一个Spark应用中,每个Executor都有固定数目的core以及固定大小的堆内存。core的数量可以通过spark-submit提交任务时的参数“–executor-cores”来进行指定,也可以在spark-default.conf配置文件中或SparkConf对象中对“spark.executor.cores”属性进行配置。类似的,堆内存的大小可以通过“—executor-memory”或“spark.executor.memory”来进行设置。cores属性决定了一个executor可以并发执行的task的数量。如“–executor-cores 5”则意味着一个executor在同一时间内最多可以同时执行5个task。内存大小决定了Spark可以缓存的数据的总量以及在group、aggregate或join时可以shuffle地最大数据总量。 “–num-executors”命令参数或“spark.executor.instances”属性则决定了job可以申请的executor总数。从CDH 5.4/Spark 1.3开始,就可以不必设置这个值了。更好的方法是通过“spark.dynamicAllocation.enabled”开启动态分配。通过动态分配,spark应用可以在大量待执行的task积压时申请更多的executor资源,并在空闲时释放executor资源。 同时了解Spark请求的资源是如何与yarn可用资源相配合的也是很重要的。相关的yarn参数包括: yarn.nodemanager.resource.memory-mb:控制了集群每个节点上所有的container可以使用的最大内存总量。 yarn.nodemanager.resource.cpu-vcores:控制了集群每个节点上所有的container可以使用的最大core总量 在提交任务时请求5个executor core就等于向yarn集群请求5个虚拟core。但是向yarn请求内存则有点复杂,情况大致如下: 1. “–executor-memory/spark.executor.memory”控制着executor需要使用的堆内存大小,但是spark也会用到一些堆外内存,比如interned String或者direct byte buffer。“spark.yarn.executor.memoryOverhead”控制着这部分内存,它和其它属性一起决定了每个executor向yarn请求的内存总量。这个属性的默认值是“max(384, 0.07 * spark.executor.memory)” 2. Yarn提供的内存可能会比请求的多一点。Yarn的配置参数“yarn.scheduler.minimum-allocation-mb”和“yarn.scheduler.increment-allocation-mb”控制着分配内存的最小值和增加量。 下图展示了spark和yarn的内存相关属性及配置的层级结构: 对于如何分配executor,如觉得以上内容还不够,那么还可以关注下如下的几个点: 1. application master,是一个非executor container,有向yarn请求container的特殊能力,它本身所占用的资源也需要被考虑进来。在“yarn-client”模式下,它默认会占用1024M内存以及一个vcore。在“yarn-cluster”模式下,application master会运行driver,所以通过“–driver-memory”和“–driver-cores”来配置资源通常是有效的。 2. 给executor分配太多的内存会导致过长的GC延迟。64GB是一个稍显粗略的内存上限建议。 3. 我们已经注意到HDFS客户端在并发线程过多时会存在一些问题,大致每个executor分配5个task就可能耗尽写入带宽,所以给executor分配的core的数目最好低于5。 4. 运行“小型”executor(例如只有一个core且内存仅够运行一个task的executor)实际上是放弃了在一个JVM中并发执行多个Task的优势。比如broadcast变量需要在每个executor上复制一份copy,太多小型executor的话就会产生过多的这种变量copy。 为了能够更具体地描述以上的这些内容,这里以一个Spark应用示例,通过调整这个应用的配置,让其可以充分利用集群的资源:假设一个集群有6个节点在运行NodeManager,每个节点有16个core以及64GB内存。那么每个NodeManager的配置“yarn.nodemanager.resource.memory-mb”和“yarn.nodemanager.resource.cpu-vcores”可以分别设置为63 * 1024 = 64512(MB)和15。这里没有将全部的资源都分配给Yarn container,因为需要留一些资源给每个节点的操作系统以及hadoop后台进程。这里留了1G内存和1个core给这些系统进程。Cloudera Manager会自动计算这些内容并完成相关的Yarn配置。 根据上面说的集群配置,我们极有可能在提交任务时这样配置executor:“–num-executors 6 –executor-cores 15 –executor-memory 63G”。但是,这是一个错误的做法,因为: 6GB+的executor塞不进有63GB内存空间的NodeManager; application master会占用某个节点上的一个core,这意味着那个节点上将不会有足够的core可以分配给一个15-core的executor; 每个executor有15个core将有可能导致HDFS IO吞吐异常。 比较好的做法是这样进行配置:“–num-executors 17 –executor-cores 5 –executor-memory 19G”。为什么呢? 这样每个节点上将会有3个executor——有application master的节点除外,它上面只有两个executor; “–executor-memory”的值是这样算出来的:(63/3) = 21,3是每个节点的executor数量;21 * 0.07 = 1.47,前面提过0.07是堆外内存的比例;21-1.47≈19. 优化并发度 我们都知道Spark是一套并发处理引擎。不过也许我们并不清楚:Spark并不是一个特别“神奇”的并发处理引擎,能够无限制的并行处理业务;它的能力也有极限,我们需要找出最优的并发度。 Spark的每一个stage都有一定数量的task,每个task会串行地处理数据。在优化Spark Job并行度的时候,task数量也许是决定应用性能的唯一的也是最重要的参数。 那么这个数量是如何决定的呢?在之前的文章里有讲述过Spark是如何将RDD划分为不同的stage的。简单提示一下:一些transformation操作如repartition或reduceByKey造成了stage的边界。一个stage中task的个数与stage中最后一个RDD的partition数目相同。一个RDD中partition的数目与其所依赖的RDD的partition数目相同。不过有一些例外情况:coalesce transformation算子创建的RDD的partition总数可以少于其父RDD的partition总数;union transformation算子会创建一个RDD,其partition总数为所有父RDD的全部partitio数目之和;cartesian创建的RDD的partiton总数为所有父RDD的partiton数量的乘积。 如果一个RDD没有父RDD呢?依赖textFile或hadoopFile创建的RDD的partition数量取决于其所使用的MapReduce InputFormat实现。通常情况下一个HDFS block对应着一个partition。对于依赖parallelize方法创建的RDD,其partiton数量取决于用户在调用parallelize方法时传递的参数,如果用户没有传递参数那么会默认采用“spark.default.parallelism”的值。 要想知道一个RDD中partition的个数,可以通过调用rdd.partitions().size()获得。 这里需要关心的是task数量过小的情况。如果task的数量小于可用的slot总数,那么对应的stage将无法充分利用所有可用的CPU。 task数量过少也意味着在执行聚集操作时每个task都会有更大的内存压力。任何“join”、“group”或“*ByKey”操作会将对象保存在hashmap或内存buffer中,以便进行分组或排序。“join”、“group”或“groupByKey”在触发shuffle时会在fetch端使用这些数据结构。“aggregateByKey”、“reduceByKey”在触发shuffle时会在两端使用这些数据结构。 当聚集操作要获取的数据在内存中不容易放下的时候,就会暴露一些问题。第一,在内存中保留太多数据会造成GC压力,容易导致处理中断。第二,如果在内存中放不下这些数据,Spark将会尝试将之溢写到硬盘上,这会产生硬盘IO或排序。过大的shuffle导致的内存开销是我在所有的Cloudera用户中看到的造成Job执行过慢的首要原因。 那么该怎样增加partition的数量呢?如果当前stage是在从hadoop中读取数据,那么可用的做法有: 调用repartition方法,这会产生一次shuffle; 调整InputFormat实现,创建更多的partition; 向HDFS写数据的时候,使用更小的block size。 如果当前stage正在从其他stage接收数据。那么导致stage分界的transformation操作通常会接受一个numPartitions参数,比如: 那么X设置为多少才比较合适呢?最直接的做法就是通过多次实验来决定:看一下父RDD的partition数量,然后每次按这个数量的1.5倍来设置X的值,多次重复,直到性能不再有明显提升。 也有一些计算这个X值的规则,但是不太好验证,因为有些参数不容易计算出来。这里也会介绍下这些规则,但是不建议在工作中使用,这里的介绍只为了能让大家对相关的内容有些深入的了解。使用这些规则的主要目标是在可以充分利用可用内存的前提下,创建足够多的task。 每个task的可用内存等于如下公式:(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores。memory fraction和safty fraction的默认值分别为0.2和0.8. 内存中的shuffle数据的量不太容易确定。最可行的方案是找到stage运行时Shuffle Spill(Memory)和Shuffle Spill(Disk)的比例。然后用这个比例乘以shuffle写数据的总量。不过在这个stage执行reduce操作时情况会有点儿复杂: 此时可以把partition的数量稍稍调大一点儿——一般来说,partition的数量通常是多点儿比少点儿好。 实际上,在设置partition的数量如果有些犹豫不决的话,可以尝试将partiton的数量设置地大一些。这和在编写MapReduce程序时有些不同,MR对task的总数通常比较保守。这是因为MapReduce启动task的开销通常比较大,而Spark则比较小。 压缩数据结构 Spark上的数据流由record的形式构成。record通常有两种表现形式:一种表现形式是反序列化后的java对象;一种是序列化后的二进制数组。通常在内存中Spark会使用反序列化后的record,而在硬盘或网络传输时会使用序列化后的record。也有一些工作在尝试将shuffle数据以序列化后的形式保存在内存中。 “spark.serializer”这个属性决定了使用哪种序列化方案使record在这两种表现形式间转换。Kryo serializer,即org.apache.spark.serializer.KryoSerializer,是推荐的选项。但可惜的是它并不是默认的选项。因为在Spark的早期版本中KryoSerializer有些不稳定,而现在Spark又不想打破版本间的兼容性。但是不管如何,Kryo都应该是优先使用的serializer。 record在这两种表现形式间的转换对于Spark的性能有着巨大的影响。因此很有必要检查一下应用运行时在各处使用的数据类型,并尝试挤出一些水分。 数据序列化后的体量过大可能会导致更加频繁的硬盘和网络IO,同时也会减少Spark可以缓存(这里是指在MEMORY_SER storage level下)的record的总量。这里主要的解决方案是将所有的用户自定义类都通过SparkConf#registerKryoClasses API来注册和传递。

    [阅读更多...]
  • Spark Job调优–Part 1

    在开始写spark代码或者翻阅spark文档的时候,会遇到一些诸如“transformation”,“action”和“RDD”这样的术语。了解这些术语对于编写spark代码是至关重要的。 类似的,当写的spark程序执行失败的时候或者尝试通过Spark WebUI来研究提交的应用为什么执行时间过长的时候,可能会遇到另外一些术语,如“job”、“stage”和“task”,了解这些术语对于写出“好”的Spark程序至关重要。对于这里的“好”,我的意思是执行得足够快。要写出高效的Spark程序,了解一些Spark的底层执行模式是非常非常重要的。 在这篇文里,会介绍部分诸如“Spark程序是如何在集群上执行的”这样的基础知识。通过了解Spark的执行模式,读者也会学到一些如何写出高效Spark程序的实用建议。 Spark是如何执行你的程序的 一个Spark应用是由一个Driver进程和一系列分散在集群不同节点上的Executor进程共同组成的。 Driver进程负责所有要执行的工作的上层流程控制。 Executor进程负责执行具体的工作(以Task的形式)以及存储用户选择的要缓存的数据。 在应用运行期间,Driver和Executpr通常都是固定不变的,不过Spark动态资源管理会改变后者。每一个Executor都有多个slot来运行Task,并在其生命周期内并发执行。 将Driver进程和Executor进程部署到集群是由集群管理器(Yarn、Mesos或者单节点Spark)来完成的,但是Driver和Executor是保存在每个Spark应用中的。 在Spark的执行模式中,位于最上层的概念是Job。在一个Spark应用中调用Action算子就会触发开始一个Spark Job来完成这个Action的运算。Spark会检查这个Action算子所依赖的RDD关系图来获取Job的全貌,并形成一个执行计划。该执行计划从最后端的RDD(即不依赖已缓存的数据或其它RDD的RDD)开始,最终生成计算Action结果所需的RDD。 执行计划的一个主要内容是将Job的transformation算子组合为一个或多个stage。一个stage对应着一批执行相同代码的Task集合;每个Task处理不同的数据子集。每个stage中的transformation算子都是不需要shuffle全部数据就可以完成的。 是什么决定了数据是否要被shuffle呢? 回忆一下,每个RDD是由固定数量的partition组成的;每个partition又是由一定数量的record组成的。对于那些由被称为 “窄” transformation(如map或filter)返回的RDD,计算单个partition中的record所需的record都保存在父RDD的单个partition中。每个对象都只会依赖父RDD的单个对象。coalesce之类的操作可以在一个task中处理多个partition中的数据,但是这种transformation依然被视为 “窄” transformation,因为用来计算每个输出record的输入record仍然在有限的partition子集中。 不过,spark也支持诸如groupByKey和reduceByKey这样的宽依赖transformation。在这些宽依赖操作中,用来计算单个partition的record所需的数据可能来自父RDD的多个partiton中。“byKey”操作通过同样的task将拥有相同key的元组最终放到了同一个partition中。为了完成这种操作,spark就需要执行一次shuffle,这样在一个新的stage中,收集分散在集群上的数据,最终创建新的partition集合。 举个例子,可以考虑下如下的代码: 这段代码中执行了一个action算子。这个action算子又依赖于一系列transformation算子。代码中的action算子和transformation算子执行于由一个文本文件生成的RDD上。这段代码会在一个stage中执行完成,因为涉及到的三个transformation操作没有一个需要的数据是来自输入RDD的多个分区中的。 再看一个相反的示例,下面的代码统计了一个文本文件中每个出现次数超过1000次的单词的具体出现次数。 代码中的整个处理过程被分成了三个stage。其中的两个reduceByKey操作可以视为stage的分界点,因为计算reduceByKey的输出需要对数据按key进行重新分区。 下图是一个更复杂的transformation执行图。其中的join操作有着多个依赖。 下图用粉色方框框起来的表示执行过程中的不同stage。 在每个stage的边界,数据会被父stage的task写到硬盘上,而后又会被子stage的task通过网络获取到。因为导致了沉重的硬盘和网络I/O开销,stage边界操作代价是昂贵的,应当尽量避免。父stage的partition数目和子stage的partition数目有可能是不一致的。通常,会触发stage边界的transformation算子都会有一个类似“numPartitions”这样的参数来决定将子stage的数据分为多少个partition。 选择正确的操作 当尝试用spark处理问题时,我们有许多的方式来将action和transformation算子进行组合计算来得到同一个正确的结果。但是并不是所有的这些组合在性能上都是一致的:避免一些常见的陷阱、采用正确的算子组合通常对应用的性能有着显著地影响。接下来介绍的一些规则和经验,可以在必要的时候帮你做出选择。 要从多个可行的算子组合中选出最优方案来,最基本的一个标准是这种算子组合能够减少shuffle的次数和shuffle的数据总量。这是因为shuffle是一个相当耗费资源的操作:所有要shuffle的数据会先写到硬盘上而后再通过网络进行传输。repartition、join、cogroup以及类似“*By”或“*ByKey”这样的transformation算子都会导致shuffle。并非所有的操作都是等价的。Spark开发新手遇到的一些性能陷阱通常都是因为选择了错误的操作导致的。 下面介绍一些应该避免的操作。 在执行关联的规约操作时避免使用groupByKey操作 举个例子,如这段代码: 这段代码执行效果等同于直接使用reduceByKey,如下: 两段代码执行效果虽然相同,但是使用groupByKey的代码会在集群内传递所有的数据集。而后者会先在每个partition内按key进行局部计算,而后再执行shuffle将局部结果集合并起来形成最终结果。 当输入输出值类型不同时避免使用reduceByKey操作 举例如:写一个transformation计算出相同key的字符串集合。一个写法就是使用map将原始的key/value对中的value转为单元素的Set集合,然后再使用reduceByKey将Set集合组合起来: 这段代码会创建太多的不必要的对象,因为需要为每个原始key/value对创建一个Set对象。更好地做法是使用aggregateBykey,这个方法可以更高效地执行map阶段聚合: 避免使用“flatMap-join-groupBy”模式 当两个数据集已经按key做了group,此时想在这两个数据集保持group的状态下进行join操作,可以使用cogroup方法,这样可以避免拆开group又重新执行group造成的开销。 什么时候不会发生shuffle呢 知道在什么样的场景下,上面的transformation操作不会产生shuffle也是很重要的。在之前已经有一个transformation操作使用相同的partitioner完成分区的情况下,spark知道如何避免重复shuffle。看一下下面的代码: 因为没有将partitioner参数传递给reduceByKey方法,所以会使用默认的partitioner,也就是说rdd1和rdd2都会使用HashPartitioner按hash进行分区。这两个reduceByKey操作将会产生两次shuffle。如果这两个RDD有相同数目的partition,那么在执行join操作的时候将不会需要额外的shuffle。因为这两个RDD是按相同规则进行分区的,rdd1的任何一个数据集都只会出现在rdd2的单个partiton中。因此,rdd3的任何一个输出partiton的数据都只会依赖rdd1和rdd2的单个partition的数据,不需要再执行第三次shuffle了。 举个例子,如果前面代码中的“someRdd”有四个partition,“someOtherRdd”有两个partition,这两者执行的reduceByKey操作都会用到三个partition,那么这个任务的执行示意图就大致如下: 如果rdd1和rdd2使用不同的partitioner,又或者它们都仍然使用默认的HashPartitioner却设置了不同的分区数,那么又是怎么执行的呢?在这种情况下,只有一个RDD(partition数目较少的那个RDD)会需要在执行join操作的时候重新shuffle。 一样的transformation操作、一样的输入数据,设置了不同的partition数目时的执行示意图: 对两个数据集执行join操作时,要想避免shuffle,可以考虑利用broadcast变量。当其中一个数据集小到可以放进Executor的执行内存中时,可以先在driver中将这个数据集放进一个hash表中然后在将之广播给每个Executor。之后的map操作就可以参考这个hash表来进行查询了。 什么时候shuffle越多越好 通常我们都需要尽量减少shuffle的数量,但是有一些特殊的场景我们需要反其道而行之。增加shuffle可以增加任务的并行度,这有助于提升性能。假设原始数据来自于几个比较大的不可分割的文件中,那么InputFormat使用的分区方式会将大量的数据放到每个partition中,而不是产生足够多的partition来充分利用可用的core。这种场景下,加载完数据后,执行一次repartition来增加partition的数量(此时会产生shuffle)将有助于之后的操作充分利用集群的CPU。 另外一个场景是在使用reduce或aggregate操作将数据聚合到driver上时。当partition太多时,在Driver上使用单线程将所有partition的数据聚合到一起时,很容易遇到瓶颈。要降低Driver的负载可以考虑先使用reduceByKey或aggregateByKey展开一轮分布式地聚集运算,将原始数据集的partition总数缩小。在将最后的结果发送到driver之前,每个partition上的数据都会互相按key进行合并。具体的,可以看一下treeReduce和treeAggregate这两个方法是如何实现的。 在原始数据已经按key做了分组之后,这个技巧将会特别地有效。假设我们要写一个应用统计一个文本文件内每个单词出现的次数,然后将结果汇总到driver上。一个方法是使用action算子aggregate在本地对每个partition进行运算,然后将将结果汇总到Driver。另一种方式使用aggregateByKey算子,以一种完全分布式的形式进行运算,最后再调用collectAsMap将结果汇总到driver上。 二次排序 另一个需要注意的重要功能是repartitionAndSortWithinPartitions这个transformation算子。这个算子看起来有点儿神秘,但是好像会在各种有趣的场景下出现。它将排序推到了执行shuffle的机器上。在那里,大量的数据被高效地溢写,并且排序可以和其他操作组合在一起使用。 举个例子,Apache Hive在Spark上的join方法实现就使用了这个transformation算子。此外,在二次排序模式中它也是一个重要的组成模块——在这种模式下数据按key进行分组,同时在按key遍历值的时候,key对应的值需要保持一定的顺序。这种模式的使用场景通常为将事件日志按用户进行分组,并将每个用户的事件按发生时间排序后再进行分析。直接使用repartitionAndSortWithinPartitions执行二次排序对用户有点儿难度,不过SPARK-3655大大简化了这个问题。 结论 现在应该能够对涉及到创建一个性能高效的Spark程序的基本因素有足够的了解了。在下一节我们将继续介绍资源请求、并行度和数据结构相关的调优内容。   本文译自:How-to: Tune Your Apache Spark Jobs (Part 1)

    [阅读更多...]