前两天又接了一个Spark任务,倒不复杂,依然是检索HDFS上的日志数据这样的事情。不过瞅着组内跑着十几二十个任务内存一共只有160来G的yarn集群,有些欲哭无泪。 事情还是要做的,反正执行时间要求不太严格,只能想办法尽量压缩内存的占用了。 先说下背景:现在使用的yarn集群由8个容器组成,每个容器的内存大概20G;工作内容是检索数据,源数据大概1T左右,取出来的目标结果数据在2~8G这样子。 最开始的时候查询任务是直接使用sparkSql来完成。随着数据量的上升很快就遇到了最经典的两个问题:StackOverflowError和OutOfMemoryError。 对于栈溢出,之前设计了几个解决方案,在历史文章里面有记录《Spark StackOverflowError》。其中我使用了任务内多批次执行的方案。现在想来,这其实并不是最好的解决方案:问题在于分批越多,每批任务中的action算子就会导致任务的执行时间越长,远不如直接增加栈空间来得简单直接。不过也算是错有错着,这反倒为后来的优化打下了基础。 至于堆内存溢出,主要发生在将每个partition的数据合并压缩的阶段:.repartition(1).saveAsTextFile(pathSave, classOf[GzipCodec])。因为这个操作可能会发生在每个Executor上,所以只好通过简单的增加Executor的内存来解决问题。因为内存总量有限,单个Executor的内存调大了,就只能将task的并发度调小。这样在更严重的问题暴露之前,一直尝试解决的问题就是如何在并发度和内存占用之间取得平衡。 更严重的问题出现在这次的需求上:很简单,要导出的结果数据集变得非常大了,一般都会大于8G,此时堆内存溢出频繁出现。应对方案如下:取消压缩操作、增大Executor执行内存,将Executor的数量调整为2,每个Executor的task数目调整为1。这样Spark任务可以正常执行了,但是因为并行度太小的缘故,执行时间巨长——动辄跑上十来个小时。优化执行速度又提到了时间表上。 是一次执行错误给了优化的方向。现在任务的执行步骤为: 某次任务执行到第4步的时候报错了,考虑到耗时的问题,就重新写了一段代码来完成4和5两步的操作。此时想到这个任务在不同的阶段对资源的需求是不一样的: 在执行1~3这几个步骤的时候对内存的需求没那么强,但是如果稍稍增加些并行度就能极大地提升任务的执行效率; 第4步则是典型的吃内存的操作,此时并行度为1,但是内存需要足够大才能保证任务顺利完成。 此时方案已经很清晰了:将一个任务拆成两个,一个负责搜集数据,一个负责合并生成的中间数据,在执行的时候按不同的策略分配资源。 至此,当前的任务优化已完成。 再扯些没用的。 最后的优化方案实际上非常简单,以至于我很奇怪为什么一开始没想到。并且这种方案是在Hadoop的计算实践中是最常用的操作。唉,也许是灯下黑吧。 也许直接使用Hadoop会是一个更好地选择。因为瓶颈主要出现在内存上,Hadoop对内存资源的占用会少很多。 如果能不走yarn,直接使用java操作,那么尽量不走yarn好了,虽然复杂度会提升很多,但是执行效率是有保证的。 另外,同事曾经问过我一个问题:如果减少了executor的数目,那么每个executor要处理的数据不就变多了,这样也会造成内存压力。听到这个问题时,我的第一印象是“对啊,之前怎么没有考虑到这一点啊”。后来仔细思索了一段时间,想明白了关键:这个问题的前半句是对的,数据总量固定,并行度降低,单个executor要处理的数据量必然会增加;但是后半句是错的,内存中的数据量取决于partition的数量,在配置中则是和task数量相关。 记录一组spark任务提交参数,留着以后参考: 就这些了。
[阅读更多...]-
Spark数据导出任务内存优化记录
-
linux防火墙操作命令
最近又开了一个VPS,免不了一通配置。在这里简单记录下linux防火墙相关的操作。 操作系统:centos7。 1. firewall启动、停止、重启 2. 配置/取消 firewall开机启动 3. 新增开放端口 注意事项: 执行新增/删除操作需要重启防火墙服务。 其他节点 telnet开放的端口必须保证本地 “telnet 127.0.0.1 端口号” 能通。本地不通不一定是防火墙的问题。 4. 删除开放端口 5. 查看防火墙信息 另外一些指令,或可有用:
[阅读更多...] -
Metrics学习02 – Counter
Counter是要学习的Metrics的第二个工具,顾名思义即是计数器,通常用来执行统计之类的工作。 Counter比Gauge也复杂不了多少,直接看代码好了: 这里的代码较Gauge的那段稍稍有些不同:主要是在Counter实例的创建上 —— 这里使用了MetricRegistry实例的一个工具方法counter()。 MetricRegistry的实例为各种指标工具都提供了快速创建实例的方法,通过MetricRegistry提供的方法创建完成指标实例后可以自动完成注册。所以在上面的代码中没有再显式地将counter实例注册到MetricRegistry中。 另外值得细细品一下的是Counter的实现:Counter的计数能力主要依赖LongAdder类完成。 一般执行计数统计,最先想到的是AtomicLong/AtomicInt类。AtmoicXXX使用硬件级别的指令 CAS 来更新计数器的值,这样可以避免加锁,机器直接支持的指令,效率也很高。但是AtomicXXX中的 CAS 操作在出现线程竞争时,失败的线程会白白地循环一次,在并发很大的情况下,因为每次CAS都只有一个线程能成功,竞争失败的线程会非常多。失败次数越多,循环次数就越多,很多线程的CAS操作越来越接近 自旋锁(spin lock)。计数操作本来是一个很简单的操作,实际需要耗费的cpu时间应该是越少越好,AtomicXXX在高并发计数时,大量的cpu时间都浪费会在 自旋 上了,这很浪费,也降低了实际的计数效率。 使用LongAdder计数器可以避免这个问题。LongAdder采用了锁分段的思想,每个LongAdder实例都维护了一组计数单元Cell[],并发计数时,不同的线程可以在不同的计数单元cell[threadId]上进行计数,这样减少了线程竞争,提高了并发效率。本质上是用空间换时间的思想。 不过LongAdder一开始并不会直接使用计数单元Cell[],而是先使用一个long类型的base存储,当casBase()出现失败时,则会创建计数单元Cell[]。此时,如果在单个计数单元面出现了更新冲突,那么会尝试创建新的计数单元Cell,或者将Cell[]扩容为2倍。代码如下: 在高并发的情况下,LongAdder较之AtomicXXX有着数倍的性能优势。因此,通常建议使用LongAdder替换AtomicXXX。 再看下Counter的测试结果:
[阅读更多...] -
Metrics学习01 – Gauge
最近在写一个小应用,有些计量方式觉得可以参考一下Metrics,所以打算花两天的时间学习一下这个工具。 Overview Metrics是一个java监控计量工具包。在Spark、Hadoop、Spring等软件中都可以看到它的影子。Metrics提供了多种指标工具,如Gauge、Counter、Metrer、Timer、Histogram以及HealthCheck等。 这次先看一下Gauge,其他的看时间再逐个学习。 Gauge可以说是Metrics的最简单的一个指标:它的作用就是引用一个值。 来看个例子: 代码中通过匿名类的形式创建了一个Gauge接口的实例,作用是获取当前的时间。实现得非常简单,不需要多做解释。 因为Gauge接口只有一个方法getValue,是一个函数接口,所以可以考虑用lambda表达式创建Gauge接口实例: 既然Gauge这么简单,为什么不直接使用Gauge的值,还偏要用Gauge接口封装一下?是为了能在Metrics框架中记录并表示这个值。 Metrics框架中有几个基础概念:MetricRegistry、Reporter以及Metric。Metric前面提过两句,也演示了Metric之一的Gauge的用法。接下来简单介绍下MetricRegistry和Reporter。 MetricRegistry MetricRegistry的作用从类名就可以看出来:是Metric的注册中心(或者说是Metric容器),负责管理用户创建的所有Metric实例。 MetricRegistry主要提供了几种工具方法: 指标名称创建 创建Metric实例并自动注册 增删Metric实例 对注册的Metric实例应用监听器和过滤器 Reporter接口 从接口名称看起来,Reporter的作用应该是汇总指标实例的数据并生成报表。 Reporter接口的主要子类是ScheduledReporter,其核心是ScheduledExecutorService和ScheduledFuture,用来管理报表的定时输出。ScheduledReporter的子类包括ConsoleReporter、CsvReporter和Slf4jReporter,可以以不同的形式展示报表数据。 在4.x版本以前,Reporter接口还有实现一个类JmxReporter,可以通过JMX的形式输出报表数据。 扫了几个Reporter的实现,看出Reporter确实主要用来生成报表。不过也许是Metrics框架想要提供更多的自由,Reporter接口里并没有定义任何需要实现的方法: 如果需要以自定义的形式输出报表数据,可以继承ScheduledReporter类或实现Report接口来实现自己的需求,比如将报表数据以HTTP发送给统计应用。 Other 最后,看一下示例代码的执行结果:
[阅读更多...] -
SpringBoot探索01 – @Import注解
Overview Spring中@Import注解最初主要是在配置类中使用,目的是引入其他的配置类(@Configuration)并实现自动注入。 目前Import并不只是支持引入@Configuration注解的类,也支持引入ImportSelector和ImportBeanDefinitionRegistrar接口的实现类,甚至可以引入普通的Java Bean并完成注入。 写了一个简单的应用来进行测试:spring-boot-import。 做些说明。在应用中定义了一个Worker类,应用做的事情就是结合@Import注解用不同的方式注入Worker类的多个Bean实例。 每个Worker Bean的实例通过name进行区分。 代码的一个核心是MyConfig类,代码如下: 这个类中包含@Configuration注解,说明是一个配置类,Spring会自动注入这个类的实例。此外这个类还通过@Bean注解注入了一个Worker Bean实例“tom”,又通过@Import接口引用三个其他类,目的是尝试注入其他的Worker Bean实例。最后在WorkerService中尝试获取并逐行打印注入的Worker实例: 接下来详细介绍下这个过程中是如何使用@Import接口的。 引入普通的Java Bean MyConfig类中使用@Import注解注入的MyAnotherConfig类没有继承任何超类或实现任何接口: 可以看到,如果不是内部的一个方法使用了@Bean注解,它就是一个普通的Java Bean了。也是通过这个@Bean注解,实现了另一个Worker Bean的注入。 引入ImportSelector实现类 根据Spring的文档,ImportSelector的作用是根据一些注解的属性来决定使用哪些@Configuration类。也就是配置类的选择器。通常在spring的引用包中会看到ImportSelector的实现。 因此这里定义了另一个配置类MySelectConfig,不过为了避免当前应用下Spring的自动注入,没有在这个类中添加@Configuration注解。 看起来和前面的MyAnotherConfig是一样的。不过和前例不一样的是:MySelectConfig类的注入是通过MyImportSelector来实现的。 MyImportSelector的实现如下: 这里没有基于AnnotationMetadata进行判定就直接返回了配置类的名称,在实际工作中不是一个好的实践。不过我们这里只是做一个演示,不需纠结太多。 引入ImportBeanDefinitionRegistrar实现类 ImportBeanDefinitionRegistrar与ImportSelector的作用是有着根本上的不同的:ImportSelector的作用是提供配置类;而ImportBeanDefinitionRegistrar的作用则是根据类定义完成相应Bean实例的创建。 通常ImportBeanDefinitionRegistrar多与ClassPathMapperScanner配合使用。ClassPathMapperScanner可以用来扫描指定的package,获取目标类并完成相应实例的创建。具体应用如MyBatis的@Mapper注解的解释。 看下在我们示例中的使用: 在这里通过Worker类的定义创建了一个名为“jerry”的实例。需要注意:这里虽然完成了Worker实例的创建,但是并没有配置任何属性。等在输出注入的Worker Bean的时候我们会看到这个实例的属性都是默认值。 引入Spring Component 使用@Import注解不仅可以引入普通的Java Bean,也可以引入Spring组件类,即需要使用@Component或者@Service等注解标记的类。组建类中通过@Autowired注解引用的其他组件也会被递归引用并注入。 示例应用中的WorkerService类并没有使用任何注解标记,而是在使用的时候通过@Import注解进行的引入。 这样虽然也可以使用,但并不建议这么做。 引入@Configuration注解的类 这个留到最后是因为一开始比较困惑:既然已经有@Configuration注解了,Spring就一定会自动引入这个类的,应该就没必要再使用@Import注解进行引用并注入了。 后来意识到我的想法是有漏洞的:比如一些第三方spring组件包中的配置类,既没有配置packageScan,也没有配置starter,直接使用肯定是不行的。此时使用@Import注解来导入相关的配置类及组件是一个很好地解决方案。 测试 执行WorkerControllerTest类的测试方法all(),观察测试结果,期间会输出我们创建的几个Worker Bean实例: 可以看到一个Worker实例的属性都是默认值,这个实例即是通过ImportBeanDefinitionRegistrar创建的Worker Bean “jerry”。 其他 关于@Import注解的实现原理可以参考 AbstractApplicationContext.refresh -> BeanFactoryPostProcessor -> ConfigurationClassPostProcessor -> ConfigurationClassParser.processImports()。具体就不展开了。 此外,还有另外一个注解@ImportResource主要用来引入xml或groovy配置文件。
[阅读更多...]