测试环境使用的Kafka出现了点儿问题,服务器被停止,所有的topic日志不知怎的被清空。一番大清洗(清理zookeeper节点、kafka topic log)、重启Kafka、创建分区后发现报了如下的错误: 找了些资料知道了报警的原因。 日志中提到的high watermark指的是一个partition中上一次提交的消息的offset。报警是因为topic刚创建、消费已开始、但是还没有生产者推送消息过来、对应topic没有任何offset的记录导致的。 所以该警告可以忽略,待有消息推送过来即可。 相关资料 Intra-cluster Replication in Apache Kafka #####
[阅读更多...]-
Kafka警告:No checkpointed highwatermark is found for partition
-
Kafka报错:connection reset by peer
在kafka的server.log中发现持续爆出如下错误信息: 查了下原因,是因为kafka的客户端异常关闭,而Kafka仍在向其推送数据导致的。 没有找到太好的解决办法。最后使用了一个笨方法:先停掉所有的生产者和消费者服务,再停掉Kafka;而后重启Kafka、重启停掉的服务。这样异常就消失了。 后来又发现这个异常断续出现了几次,因为是测试环境,懒得去管它,神奇的是居然最后也自动恢复了。 ######
[阅读更多...] -
解决PKIX:unable to find valid certification path to requested target
遇到了“unable to find valid certification path to requested target”这样的问题。 错误详情如下: 从异常信息中可以看到错误是因为找不到证书导致的。 解决方案有两个。 正常的方案当然是找到证书并安装证书。这种方案能找到一大把,在这里不做展开。 说个暴力的方案:取消证书认证,信任所有请求链接。 使用的工具是fluent-http-client,maven地址是: 首先创建一个自定义的HttpClient实例。这个实例将信任所有的请求链接。实例详情如下: 注意代码中的两处注释,是关键。 然后绑定新创建的实例,执行一个Get请求:就这样。 #######
[阅读更多...] -
偶尔三省
最近休假在家,但平时琐事太多,难得有敲代码的时间,不过却是一个反思的好时间。在这里记录一些想法。 不要抱怨 这是一个我不自觉会犯的毛病,以后一定要注意。 抱怨本身只能吐出一下自己的郁闷,不过却不能纾解郁闷,对问题本身也无任何裨益。相反的,每次抱怨只会加重自己的负能量,同时也会对其他人造成一些负面的影响。落到有心人耳里,甚至会造成一些严重的后果。 对于抱怨的事情,首先想的应该是如何解决问题;如果不能解决,则考虑如何将问题抛出;如果不能解决不能抛出,就按常规流程处理好了。 不要厌屋及乌 平时行事多少有些挑剔,连带的,对做事不甚合意的人也有些不是很正面的看法;有时也会因为不喜欢一些人而对他们做的事诸多挑剔。这都是不好的做法。处理事情时只看事情做的怎样,与人相处时只看为人如何,一码归一码。 不要轻易提意见 平时与人相处,对与其相关的事情,如果对方没有刻意征求看法,不要轻易提出自己的意见。 不要做好事 这一点我想了好久。 以前曾看过一个故事,好像是赵威后嫁女相关的。故事内容不大记得了,只有两句话特别有嚼头:“不要做好事”、“好事都不能做,何况是坏事呢”。大致是做好事邀名不可取,做坏事更是不行的道理。 现在想想这两句话觉得应该还有些别的意思:也许做好事的人出发点是好的,但是善意会被人恶意地利用,最后也许会产生坏的结果。 我的想法是不能无偿的做好事,可以为每一分善意设置一个门槛,这样可以过滤掉一些滥用他人善意的行为。 战战兢兢、如履薄冰,生存大致如是。
[阅读更多...] -
设计模式讨论可以休矣
多少有些标题党了。首先声明一下,就我个人来说,我对设计模式没有丝毫反感。我反感的是空泛地没有标的地讨论设计模式,或者说言必谈设计模式。 起因是和同事的一次讨论。讨论的过程不便多说,只说下我的观点:只为学习设计模式而学习设计模式是没有意义的,这样无助于写出好的代码,反而容易写出晦涩的代码。那什么是好的代码呢?在我看来,好的代码起码得有三好:好用、好看、好维护。 好用,指的是写出的代码必须得能满足需求,不然再好的代码都是无意义的代码。 好看,是说代码最少得看起来顺眼,需要遵循普遍约定的格式,类和方法需要有必要的文档(是的我是一名光荣的java程序员),复杂的逻辑处需要有注释。这一点可以参考一些大公司的编码规范,比如Google编码规范或阿里巴巴编码规范(这个真的是良心之作,有详细的pdf文档,还有eclipse和idea插件)。 好维护,这个说起来很容易就话长了。说下我的看法吧:以一个普通的合格的实习生为例,这个实习生理解并做到可以修改你的代码所用的时间,这个时间需要和代码实现的需求的复杂度成正比。注意,这里说和时间成正比的不是代码的复杂度,而是业务需求的复杂度。也许只是写一个“Hello World!”你就用到了十三种设计模式,你的代码精巧无比,但是没有毛用,这个程序本来最多只需要4行代码一分钟的时间就能完成,理解它本应该也不超过一秒钟。当然,能把简单的事情搞复杂也是人才,自然也有他们的用武之地。 我这里说的“三好”代码和设计模式没有必然的关系。有人也许会问:代码不按设计模式来写那按什么来写呢? 我的答案很简单:按需求来写啊。不过在实现需求以后还得多想想你的代码是不是好理解好维护,有冗余的地方就删掉,有复杂的地方就尝试进行简化,简化不了就写上注释,总之尽量把事情搞得简单些,越简单才越见功夫。没看见么,小李飞刀也就那么简简单单的一刀。这样写出代码也许没有遵循任何一种现成的设计模式,但是绝对是最符合需求的模式。 当然,在一开始我就提过了,我并不反感或反对设计模式。我反感的是空谈设计模式,或者将设计模式作为金科玉律。设计模式当前有用,毕竟是前辈们作出的总结,学习一下还是很有必要的。观千剑而后识器,胸中有锦绣才能做出文章。但是将之作为规范就没有必要了。 我见识过一个方法有几百上千行的代码,也见识过因为开发者野心太大而将一个简单需求搞得复杂无比的代码,也见识过为了使用一些新的特性或功能而将整体结构搞得一团糟的代码…林林总总的,大体上都是为了赶上deadline或满足开发者一时的快意而写出的糟糕的代码。对于这些,我只是希望能够在需求没有那么紧张的时候抽时间做些优化,或者在写代码的时候想想以后接收接手你的代码的人是不是会很窝心。 不写了不写了。话早就说尽了,现在已经有些唠叨了。就这样吧。
[阅读更多...] -
Spark堆内存溢出解决记录
最近的工作有很大一部分是在做用户画像。 画像读取的维度bitmap动辄几百MB,甚至存在部分GB级别的。而我们的Yarn集群规模比较小,内存总计只有100多GB。开发调试时遇到最多的问题除了Task not serializable就是heap out of memeory了。 我们使用的Spark版本是1.6.3。yarn集群使用的jdk版本是1.7。 如何解决堆内存溢出是最大的难题 。 这时的分析受惠于这几个jvm虚拟机参数“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ”很多。根据这些参数可以看到内存使用的详情。比如我这里曾遇到过新生代内存使用率达到了99%,而老年代使用率只有20%多的情况,此时需要适当调整新生代和老年代的比例。另外还尝试过使用“-XX:+HeapDumpOnOutOfMemoryError ”获取溢出时的内存快照,如果找到快照的话就可以找出是哪些对象占用了最多的内存,也有很大的几率定位到发生溢出的位置。可惜运维没有找到快照。 另一种情况出现在代码上。比如从ByteWritable获取字节数组时有两个API可用,一个是copyBytes(),一个是getBytes(),使用copyBytes可以把指定区间内的字节数组copy出来,但是这样一来占用的内存肯定是要double了。又比如该使用broadcast变量的时候使用了普通变量,该使用mapPartitions的时候使用了map。写代码的时候每调用一个方法都应该考虑一下相关的开销。 我这里遇到的内存溢出一般来说就是最简单的情况——内存是真的不够用了。某些需要加载到内存里的bitmap体量实在太大,因为整体的资源不足,同时又有其他的任务在跑,所以不能过分上调executor内存。这样能做的只有限制executor的数量,才能适当增加executor的内存。并且为了避免一个executor内可能会出现多个过大bitmap同时并行运算的问题还得限制某些stage的partition数量。更极端的情况就得限制每个executor的core的数量。 大体上就是这样。
[阅读更多...] -
Spark Job调优-Part 2
这一节将主要介绍资源调优,或者说是如何充分利用集群资源。然后再说一下如何对并发度进行优化,这是job性能参数中最难也是最重要的部分。最后我们将了解一些数据自身的表现形式:Spark读取时数据在磁盘上的保存形式(如Apache Avro和 Apache Parquet),以及缓存时或系统内移动时数据在内存中的形式。 优化资源分配 在Spark的用户反馈中经常提到一类问题如“我有一个500个节点的集群,但是在我运行一个应用时,我看到只有两个Task在执行。求帮助!”。考虑到Spark资源控制参数的数量,这种问题不是不可能存在的。在这一节里,我们将学习如何压榨出集群的最后一点资源。根据资群资源管理器的不同(Yarn、Mesos和Spark Standalone),相应的建议和配置也有所不同。不过我们将主要集中在Yarn上——这也是Cloudera推荐使用的资源管理器。 要了解一些Spark on YARN的背景知识,可以先看一下这篇文章。 目前Spark(和Yarn)主要关注的资源只有两个:CPU和内存。当然了,硬盘和网络IO对于Spark的性能也很关键,但是不管Spark还是Yarn,目前都还不能有效地对其进行管理。 在一个Spark应用中,每个Executor都有固定数目的core以及固定大小的堆内存。core的数量可以通过spark-submit提交任务时的参数“–executor-cores”来进行指定,也可以在spark-default.conf配置文件中或SparkConf对象中对“spark.executor.cores”属性进行配置。类似的,堆内存的大小可以通过“—executor-memory”或“spark.executor.memory”来进行设置。cores属性决定了一个executor可以并发执行的task的数量。如“–executor-cores 5”则意味着一个executor在同一时间内最多可以同时执行5个task。内存大小决定了Spark可以缓存的数据的总量以及在group、aggregate或join时可以shuffle地最大数据总量。 “–num-executors”命令参数或“spark.executor.instances”属性则决定了job可以申请的executor总数。从CDH 5.4/Spark 1.3开始,就可以不必设置这个值了。更好的方法是通过“spark.dynamicAllocation.enabled”开启动态分配。通过动态分配,spark应用可以在大量待执行的task积压时申请更多的executor资源,并在空闲时释放executor资源。 同时了解Spark请求的资源是如何与yarn可用资源相配合的也是很重要的。相关的yarn参数包括: yarn.nodemanager.resource.memory-mb:控制了集群每个节点上所有的container可以使用的最大内存总量。 yarn.nodemanager.resource.cpu-vcores:控制了集群每个节点上所有的container可以使用的最大core总量 在提交任务时请求5个executor core就等于向yarn集群请求5个虚拟core。但是向yarn请求内存则有点复杂,情况大致如下: 1. “–executor-memory/spark.executor.memory”控制着executor需要使用的堆内存大小,但是spark也会用到一些堆外内存,比如interned String或者direct byte buffer。“spark.yarn.executor.memoryOverhead”控制着这部分内存,它和其它属性一起决定了每个executor向yarn请求的内存总量。这个属性的默认值是“max(384, 0.07 * spark.executor.memory)” 2. Yarn提供的内存可能会比请求的多一点。Yarn的配置参数“yarn.scheduler.minimum-allocation-mb”和“yarn.scheduler.increment-allocation-mb”控制着分配内存的最小值和增加量。 下图展示了spark和yarn的内存相关属性及配置的层级结构: 对于如何分配executor,如觉得以上内容还不够,那么还可以关注下如下的几个点: 1. application master,是一个非executor container,有向yarn请求container的特殊能力,它本身所占用的资源也需要被考虑进来。在“yarn-client”模式下,它默认会占用1024M内存以及一个vcore。在“yarn-cluster”模式下,application master会运行driver,所以通过“–driver-memory”和“–driver-cores”来配置资源通常是有效的。 2. 给executor分配太多的内存会导致过长的GC延迟。64GB是一个稍显粗略的内存上限建议。 3. 我们已经注意到HDFS客户端在并发线程过多时会存在一些问题,大致每个executor分配5个task就可能耗尽写入带宽,所以给executor分配的core的数目最好低于5。 4. 运行“小型”executor(例如只有一个core且内存仅够运行一个task的executor)实际上是放弃了在一个JVM中并发执行多个Task的优势。比如broadcast变量需要在每个executor上复制一份copy,太多小型executor的话就会产生过多的这种变量copy。 为了能够更具体地描述以上的这些内容,这里以一个Spark应用示例,通过调整这个应用的配置,让其可以充分利用集群的资源:假设一个集群有6个节点在运行NodeManager,每个节点有16个core以及64GB内存。那么每个NodeManager的配置“yarn.nodemanager.resource.memory-mb”和“yarn.nodemanager.resource.cpu-vcores”可以分别设置为63 * 1024 = 64512(MB)和15。这里没有将全部的资源都分配给Yarn container,因为需要留一些资源给每个节点的操作系统以及hadoop后台进程。这里留了1G内存和1个core给这些系统进程。Cloudera Manager会自动计算这些内容并完成相关的Yarn配置。 根据上面说的集群配置,我们极有可能在提交任务时这样配置executor:“–num-executors 6 –executor-cores 15 –executor-memory 63G”。但是,这是一个错误的做法,因为: 6GB+的executor塞不进有63GB内存空间的NodeManager; application master会占用某个节点上的一个core,这意味着那个节点上将不会有足够的core可以分配给一个15-core的executor; 每个executor有15个core将有可能导致HDFS IO吞吐异常。 比较好的做法是这样进行配置:“–num-executors 17 –executor-cores 5 –executor-memory 19G”。为什么呢? 这样每个节点上将会有3个executor——有application master的节点除外,它上面只有两个executor; “–executor-memory”的值是这样算出来的:(63/3) = 21,3是每个节点的executor数量;21 * 0.07 = 1.47,前面提过0.07是堆外内存的比例;21-1.47≈19. 优化并发度 我们都知道Spark是一套并发处理引擎。不过也许我们并不清楚:Spark并不是一个特别“神奇”的并发处理引擎,能够无限制的并行处理业务;它的能力也有极限,我们需要找出最优的并发度。 Spark的每一个stage都有一定数量的task,每个task会串行地处理数据。在优化Spark Job并行度的时候,task数量也许是决定应用性能的唯一的也是最重要的参数。 那么这个数量是如何决定的呢?在之前的文章里有讲述过Spark是如何将RDD划分为不同的stage的。简单提示一下:一些transformation操作如repartition或reduceByKey造成了stage的边界。一个stage中task的个数与stage中最后一个RDD的partition数目相同。一个RDD中partition的数目与其所依赖的RDD的partition数目相同。不过有一些例外情况:coalesce transformation算子创建的RDD的partition总数可以少于其父RDD的partition总数;union transformation算子会创建一个RDD,其partition总数为所有父RDD的全部partitio数目之和;cartesian创建的RDD的partiton总数为所有父RDD的partiton数量的乘积。 如果一个RDD没有父RDD呢?依赖textFile或hadoopFile创建的RDD的partition数量取决于其所使用的MapReduce InputFormat实现。通常情况下一个HDFS block对应着一个partition。对于依赖parallelize方法创建的RDD,其partiton数量取决于用户在调用parallelize方法时传递的参数,如果用户没有传递参数那么会默认采用“spark.default.parallelism”的值。 要想知道一个RDD中partition的个数,可以通过调用rdd.partitions().size()获得。 这里需要关心的是task数量过小的情况。如果task的数量小于可用的slot总数,那么对应的stage将无法充分利用所有可用的CPU。 task数量过少也意味着在执行聚集操作时每个task都会有更大的内存压力。任何“join”、“group”或“*ByKey”操作会将对象保存在hashmap或内存buffer中,以便进行分组或排序。“join”、“group”或“groupByKey”在触发shuffle时会在fetch端使用这些数据结构。“aggregateByKey”、“reduceByKey”在触发shuffle时会在两端使用这些数据结构。 当聚集操作要获取的数据在内存中不容易放下的时候,就会暴露一些问题。第一,在内存中保留太多数据会造成GC压力,容易导致处理中断。第二,如果在内存中放不下这些数据,Spark将会尝试将之溢写到硬盘上,这会产生硬盘IO或排序。过大的shuffle导致的内存开销是我在所有的Cloudera用户中看到的造成Job执行过慢的首要原因。 那么该怎样增加partition的数量呢?如果当前stage是在从hadoop中读取数据,那么可用的做法有: 调用repartition方法,这会产生一次shuffle; 调整InputFormat实现,创建更多的partition; 向HDFS写数据的时候,使用更小的block size。 如果当前stage正在从其他stage接收数据。那么导致stage分界的transformation操作通常会接受一个numPartitions参数,比如: 那么X设置为多少才比较合适呢?最直接的做法就是通过多次实验来决定:看一下父RDD的partition数量,然后每次按这个数量的1.5倍来设置X的值,多次重复,直到性能不再有明显提升。 也有一些计算这个X值的规则,但是不太好验证,因为有些参数不容易计算出来。这里也会介绍下这些规则,但是不建议在工作中使用,这里的介绍只为了能让大家对相关的内容有些深入的了解。使用这些规则的主要目标是在可以充分利用可用内存的前提下,创建足够多的task。 每个task的可用内存等于如下公式:(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores。memory fraction和safty fraction的默认值分别为0.2和0.8. 内存中的shuffle数据的量不太容易确定。最可行的方案是找到stage运行时Shuffle Spill(Memory)和Shuffle Spill(Disk)的比例。然后用这个比例乘以shuffle写数据的总量。不过在这个stage执行reduce操作时情况会有点儿复杂: 此时可以把partition的数量稍稍调大一点儿——一般来说,partition的数量通常是多点儿比少点儿好。 实际上,在设置partition的数量如果有些犹豫不决的话,可以尝试将partiton的数量设置地大一些。这和在编写MapReduce程序时有些不同,MR对task的总数通常比较保守。这是因为MapReduce启动task的开销通常比较大,而Spark则比较小。 压缩数据结构 Spark上的数据流由record的形式构成。record通常有两种表现形式:一种表现形式是反序列化后的java对象;一种是序列化后的二进制数组。通常在内存中Spark会使用反序列化后的record,而在硬盘或网络传输时会使用序列化后的record。也有一些工作在尝试将shuffle数据以序列化后的形式保存在内存中。 “spark.serializer”这个属性决定了使用哪种序列化方案使record在这两种表现形式间转换。Kryo serializer,即org.apache.spark.serializer.KryoSerializer,是推荐的选项。但可惜的是它并不是默认的选项。因为在Spark的早期版本中KryoSerializer有些不稳定,而现在Spark又不想打破版本间的兼容性。但是不管如何,Kryo都应该是优先使用的serializer。 record在这两种表现形式间的转换对于Spark的性能有着巨大的影响。因此很有必要检查一下应用运行时在各处使用的数据类型,并尝试挤出一些水分。 数据序列化后的体量过大可能会导致更加频繁的硬盘和网络IO,同时也会减少Spark可以缓存(这里是指在MEMORY_SER storage level下)的record的总量。这里主要的解决方案是将所有的用户自定义类都通过SparkConf#registerKryoClasses API来注册和传递。
[阅读更多...] -
Spark Job调优–Part 1
在开始写spark代码或者翻阅spark文档的时候,会遇到一些诸如“transformation”,“action”和“RDD”这样的术语。了解这些术语对于编写spark代码是至关重要的。 类似的,当写的spark程序执行失败的时候或者尝试通过Spark WebUI来研究提交的应用为什么执行时间过长的时候,可能会遇到另外一些术语,如“job”、“stage”和“task”,了解这些术语对于写出“好”的Spark程序至关重要。对于这里的“好”,我的意思是执行得足够快。要写出高效的Spark程序,了解一些Spark的底层执行模式是非常非常重要的。 在这篇文里,会介绍部分诸如“Spark程序是如何在集群上执行的”这样的基础知识。通过了解Spark的执行模式,读者也会学到一些如何写出高效Spark程序的实用建议。 Spark是如何执行你的程序的 一个Spark应用是由一个Driver进程和一系列分散在集群不同节点上的Executor进程共同组成的。 Driver进程负责所有要执行的工作的上层流程控制。 Executor进程负责执行具体的工作(以Task的形式)以及存储用户选择的要缓存的数据。 在应用运行期间,Driver和Executpr通常都是固定不变的,不过Spark动态资源管理会改变后者。每一个Executor都有多个slot来运行Task,并在其生命周期内并发执行。 将Driver进程和Executor进程部署到集群是由集群管理器(Yarn、Mesos或者单节点Spark)来完成的,但是Driver和Executor是保存在每个Spark应用中的。 在Spark的执行模式中,位于最上层的概念是Job。在一个Spark应用中调用Action算子就会触发开始一个Spark Job来完成这个Action的运算。Spark会检查这个Action算子所依赖的RDD关系图来获取Job的全貌,并形成一个执行计划。该执行计划从最后端的RDD(即不依赖已缓存的数据或其它RDD的RDD)开始,最终生成计算Action结果所需的RDD。 执行计划的一个主要内容是将Job的transformation算子组合为一个或多个stage。一个stage对应着一批执行相同代码的Task集合;每个Task处理不同的数据子集。每个stage中的transformation算子都是不需要shuffle全部数据就可以完成的。 是什么决定了数据是否要被shuffle呢? 回忆一下,每个RDD是由固定数量的partition组成的;每个partition又是由一定数量的record组成的。对于那些由被称为 “窄” transformation(如map或filter)返回的RDD,计算单个partition中的record所需的record都保存在父RDD的单个partition中。每个对象都只会依赖父RDD的单个对象。coalesce之类的操作可以在一个task中处理多个partition中的数据,但是这种transformation依然被视为 “窄” transformation,因为用来计算每个输出record的输入record仍然在有限的partition子集中。 不过,spark也支持诸如groupByKey和reduceByKey这样的宽依赖transformation。在这些宽依赖操作中,用来计算单个partition的record所需的数据可能来自父RDD的多个partiton中。“byKey”操作通过同样的task将拥有相同key的元组最终放到了同一个partition中。为了完成这种操作,spark就需要执行一次shuffle,这样在一个新的stage中,收集分散在集群上的数据,最终创建新的partition集合。 举个例子,可以考虑下如下的代码: 这段代码中执行了一个action算子。这个action算子又依赖于一系列transformation算子。代码中的action算子和transformation算子执行于由一个文本文件生成的RDD上。这段代码会在一个stage中执行完成,因为涉及到的三个transformation操作没有一个需要的数据是来自输入RDD的多个分区中的。 再看一个相反的示例,下面的代码统计了一个文本文件中每个出现次数超过1000次的单词的具体出现次数。 代码中的整个处理过程被分成了三个stage。其中的两个reduceByKey操作可以视为stage的分界点,因为计算reduceByKey的输出需要对数据按key进行重新分区。 下图是一个更复杂的transformation执行图。其中的join操作有着多个依赖。 下图用粉色方框框起来的表示执行过程中的不同stage。 在每个stage的边界,数据会被父stage的task写到硬盘上,而后又会被子stage的task通过网络获取到。因为导致了沉重的硬盘和网络I/O开销,stage边界操作代价是昂贵的,应当尽量避免。父stage的partition数目和子stage的partition数目有可能是不一致的。通常,会触发stage边界的transformation算子都会有一个类似“numPartitions”这样的参数来决定将子stage的数据分为多少个partition。 选择正确的操作 当尝试用spark处理问题时,我们有许多的方式来将action和transformation算子进行组合计算来得到同一个正确的结果。但是并不是所有的这些组合在性能上都是一致的:避免一些常见的陷阱、采用正确的算子组合通常对应用的性能有着显著地影响。接下来介绍的一些规则和经验,可以在必要的时候帮你做出选择。 要从多个可行的算子组合中选出最优方案来,最基本的一个标准是这种算子组合能够减少shuffle的次数和shuffle的数据总量。这是因为shuffle是一个相当耗费资源的操作:所有要shuffle的数据会先写到硬盘上而后再通过网络进行传输。repartition、join、cogroup以及类似“*By”或“*ByKey”这样的transformation算子都会导致shuffle。并非所有的操作都是等价的。Spark开发新手遇到的一些性能陷阱通常都是因为选择了错误的操作导致的。 下面介绍一些应该避免的操作。 在执行关联的规约操作时避免使用groupByKey操作 举个例子,如这段代码: 这段代码执行效果等同于直接使用reduceByKey,如下: 两段代码执行效果虽然相同,但是使用groupByKey的代码会在集群内传递所有的数据集。而后者会先在每个partition内按key进行局部计算,而后再执行shuffle将局部结果集合并起来形成最终结果。 当输入输出值类型不同时避免使用reduceByKey操作 举例如:写一个transformation计算出相同key的字符串集合。一个写法就是使用map将原始的key/value对中的value转为单元素的Set集合,然后再使用reduceByKey将Set集合组合起来: 这段代码会创建太多的不必要的对象,因为需要为每个原始key/value对创建一个Set对象。更好地做法是使用aggregateBykey,这个方法可以更高效地执行map阶段聚合: 避免使用“flatMap-join-groupBy”模式 当两个数据集已经按key做了group,此时想在这两个数据集保持group的状态下进行join操作,可以使用cogroup方法,这样可以避免拆开group又重新执行group造成的开销。 什么时候不会发生shuffle呢 知道在什么样的场景下,上面的transformation操作不会产生shuffle也是很重要的。在之前已经有一个transformation操作使用相同的partitioner完成分区的情况下,spark知道如何避免重复shuffle。看一下下面的代码: 因为没有将partitioner参数传递给reduceByKey方法,所以会使用默认的partitioner,也就是说rdd1和rdd2都会使用HashPartitioner按hash进行分区。这两个reduceByKey操作将会产生两次shuffle。如果这两个RDD有相同数目的partition,那么在执行join操作的时候将不会需要额外的shuffle。因为这两个RDD是按相同规则进行分区的,rdd1的任何一个数据集都只会出现在rdd2的单个partiton中。因此,rdd3的任何一个输出partiton的数据都只会依赖rdd1和rdd2的单个partition的数据,不需要再执行第三次shuffle了。 举个例子,如果前面代码中的“someRdd”有四个partition,“someOtherRdd”有两个partition,这两者执行的reduceByKey操作都会用到三个partition,那么这个任务的执行示意图就大致如下: 如果rdd1和rdd2使用不同的partitioner,又或者它们都仍然使用默认的HashPartitioner却设置了不同的分区数,那么又是怎么执行的呢?在这种情况下,只有一个RDD(partition数目较少的那个RDD)会需要在执行join操作的时候重新shuffle。 一样的transformation操作、一样的输入数据,设置了不同的partition数目时的执行示意图: 对两个数据集执行join操作时,要想避免shuffle,可以考虑利用broadcast变量。当其中一个数据集小到可以放进Executor的执行内存中时,可以先在driver中将这个数据集放进一个hash表中然后在将之广播给每个Executor。之后的map操作就可以参考这个hash表来进行查询了。 什么时候shuffle越多越好 通常我们都需要尽量减少shuffle的数量,但是有一些特殊的场景我们需要反其道而行之。增加shuffle可以增加任务的并行度,这有助于提升性能。假设原始数据来自于几个比较大的不可分割的文件中,那么InputFormat使用的分区方式会将大量的数据放到每个partition中,而不是产生足够多的partition来充分利用可用的core。这种场景下,加载完数据后,执行一次repartition来增加partition的数量(此时会产生shuffle)将有助于之后的操作充分利用集群的CPU。 另外一个场景是在使用reduce或aggregate操作将数据聚合到driver上时。当partition太多时,在Driver上使用单线程将所有partition的数据聚合到一起时,很容易遇到瓶颈。要降低Driver的负载可以考虑先使用reduceByKey或aggregateByKey展开一轮分布式地聚集运算,将原始数据集的partition总数缩小。在将最后的结果发送到driver之前,每个partition上的数据都会互相按key进行合并。具体的,可以看一下treeReduce和treeAggregate这两个方法是如何实现的。 在原始数据已经按key做了分组之后,这个技巧将会特别地有效。假设我们要写一个应用统计一个文本文件内每个单词出现的次数,然后将结果汇总到driver上。一个方法是使用action算子aggregate在本地对每个partition进行运算,然后将将结果汇总到Driver。另一种方式使用aggregateByKey算子,以一种完全分布式的形式进行运算,最后再调用collectAsMap将结果汇总到driver上。 二次排序 另一个需要注意的重要功能是repartitionAndSortWithinPartitions这个transformation算子。这个算子看起来有点儿神秘,但是好像会在各种有趣的场景下出现。它将排序推到了执行shuffle的机器上。在那里,大量的数据被高效地溢写,并且排序可以和其他操作组合在一起使用。 举个例子,Apache Hive在Spark上的join方法实现就使用了这个transformation算子。此外,在二次排序模式中它也是一个重要的组成模块——在这种模式下数据按key进行分组,同时在按key遍历值的时候,key对应的值需要保持一定的顺序。这种模式的使用场景通常为将事件日志按用户进行分组,并将每个用户的事件按发生时间排序后再进行分析。直接使用repartitionAndSortWithinPartitions执行二次排序对用户有点儿难度,不过SPARK-3655大大简化了这个问题。 结论 现在应该能够对涉及到创建一个性能高效的Spark程序的基本因素有足够的了解了。在下一节我们将继续介绍资源请求、并行度和数据结构相关的调优内容。 本文译自:How-to: Tune Your Apache Spark Jobs (Part 1)
[阅读更多...] -
Kafka 调整partiton数目和replica factor
调整partiton 调整partition可以直接执行如下命令: 注意替换topicName、$ZK_HOST_NODE和partitionNum三个参数。 调整replica factor 调整replica-factor需要先创建一个json描述文件replica.json,大致如下: 在描述文件中说明分区和副本所在broker的Id的映射。 而后在replica.json所在的位置执行如下命令: 另外,kafka-manager是个好东西,可以直接在界面上完成partiton数目的调整。可惜不能调整replica-factor。 #####
[阅读更多...] -
Druid historical/broker 节点启动失败
部署Druid服务时遇到了启动失败的异常。相关的节点是historical和broker。异常信息如下: 异常信息中比较关键的部分是下面这一句: 这一句指明了启动失败的原因:分配的直接内存不足。需要的直接内存大小是“5,368,709,120”,实际提供的大小是“4,294,967,296”。 并且指明了解决方案:在虚拟机参数中添加“-XX:MaxDirectMemorySize”这样一个指标。 以为这样就够了!?druid的开发人员做的实际上还要多一点,他(们)在这段异常信息中还解释了为什么需要这么多直接内存,看看异常信息中提供的计算公式: 为了看着方便,我手动做了下换行。 公式中使用到的配置参数如druid.processing.buffer.sizeBytes和druid.processing.numMergeBuffers等可以在historical和middleManager的runtime.properties文件中找到。方括号里的值是用户自己设置的值。 至于怎么修改这个问题,网上找到的资料多是建议在middleManager的runtime.properties的druid.indexer.runner.javaOpts配置项中进行配置,我试了一下,不中。 忘了说了,使用的druid版本是0.12.1。 成功了的做法是分别在conf/druid/historical/jvm.config和conf/druid/broker/jvm.config中添加“-XX:MaxDirectMemorySize”参数。参数值可以参考异常信息的提示酌情设置。这里是我的historical/jvm.config的配置: 设置完成后,重启,可以成功,说明配置生效。就这样。 ######
[阅读更多...]