• java注解2 – 自定义注解

    这次创建一个自定义注解StopWatch,目的是用来监控Java方法的执行时间。 注解定义如下: 代码中使用到的几个元注解上一节都有说明。注解定义完成了就可以使用了,下面是一个这个注解的使用案例: 在自定义的注解中使用了两个成员变量:name和timeunit。在使用自定义注解时,需要给成员变量赋值。赋值的方式就是程序中那样: 其中因为timeunit有默认值,所以可以不用额外赋值。 值得一提的是如果只有一个成员变量,且名称为value,可以不用再使用name=value,直接在括号中填上value即可。 但是这个注解能做什么呢?什么也不能做。仅有注解是没有意义的。我们需要为自定义的注解添加一些功能,也就是注解的解析器。 因为一直在Spring平台上做开发,所以注解的解析也依托Spring来做,这里还主要使用了一个组件AspectJ。解析器定义如下: 代码中使用的Watcher是简单将计时代码做了一些封装,如有兴趣也可以看一下: 好了,就这样。 #########

    [阅读更多...]
  • java注解1 – 元注解

    简单整理了一下Annotation相关的内容。 元Annotation就是修饰Annotation的Annotation。 Java有四个元Annotation:@Retention、 @Target、@Documented和@Inherited。 @Retention @Retention只能用于修饰一个Annotation定义。用于指定被修饰的Annotation可以保留多长时间。 @Retention包含一个RetentionPolicy类型的alue成员变量,所以使用@Retention时必须为该value成员变量指定值。 value成员变量的值只能是如下3个: RetentionPolicy.CLASS:编译器将把Annotation记录在class文件中。当运行Java程序时,JVM不再保留Annotation; RetentionPolicy.RUNTIME:编译器将把Annotation记录在class文件中。当运行Java程序时,JVM也会保留Annotation,程序可以通过反射获取该Annotation信息; RetentionPolicy.SOURCE:Annotation只保留在源代码中,编译器直接丢弃这种Annotation。 @Target @Target也只能修饰一个Annotation定义,它用于指定被修饰的Annotation能用于修饰哪些程序单元。 @Target元Annotation也包含一个名为value的成员变量,该成员变量的值只能是如下几个: ElementType.TYPE:指定该策略的Annotation可以修饰类、接口(包括注释类型)或枚举定义; ElementType.FIELD:指定该策略的Annotation只能修饰成员变量; ElementType.METHOD:指定该策略的Annotation只能修饰方法定义; ElementType.PARAMETER:指定该策略的Annotation可以修饰参数; ElementType.CONSTRUCTOR:指定该策略的Annotation只能修饰构造器; ElementType.LOCAL_VARIABLE:指定该策略的Annotation只能修饰局部变量; ElementType.ANNOTATION_TYPE:指定该策略的Annotation只能修饰Annotation; ElementType.PACKAGE:指定该策略的Annotation只能修饰包定义。 @Documented @Documented用于指定被该元Annotation修饰的Annotation类将被javadoc工具提取成文档。 如果定义Annotation类时使用了@Documented修饰,则所有使用该Annotation修饰的程序元素的API文档中将会包含该Annotation说明。 @Inherited @Inherited元Annotation指定被它修饰的Annotation将具有继承性——如果某个类使用了@A Annotation(定义该Annotation使用了Inherited修饰)修饰,则其子类将自动被@A修饰。 参考文档 http://blog.zenfery.cc/archives/78.html http://www.jb51.net/article/46350.htm ########

    [阅读更多...]
  • Yarn架构设计

    yarn一个基本理念就是将JobTracker的两大主要功能——资源管理和作业调度/监控——分割开来成为两个独立的守护进程。在这个方案里,有一个全局的ResourceManager (RM)和对应每个应用的ApplicationMaster (AM)。一个应用可以是一个独立作业,也可以是多个作业构成的DAG(有向无环图,Directed Acyclic Graph)。 ResourceManager和NodeManager(NM)一起构成了整个数据计算框架。ResourceManager是系统上全部应用所需资源的最终决策者。NodeManager则是每台机器上的框架代理,负责这台机器上的所有容器。NodeManager监控一台机器上的所有容器对资源(CPU、内存、硬盘、网络)的使用,并将之上报给ResourceManager和Scheduler。 每个应用的ApplicationMaster实际上是一个具体的框架库。它的任务是负责与ResourceManager协商获取资源,以及与NodeManager合作实现task的执行与监控。 ResourceManager有两个主要组件:Scheduler(调度器)和ApplicationManager。 Scheduler根据容量、队列等常见的约束,将系统资源分配给正在运行的各种应用。这里的Scheduler是一个纯粹意义上的调度器:它不再对应用的状态进行监控和跟踪,也不保证重启因应用错误或硬件故障导致失败的任务。Scheduler基于应用对资源的需求执行调度功能。这是基于一个抽象的概念“资源容器(Container)”实现的。资源容器将内存、CPU、硬盘和网络等元素组合在一起支持对任务的资源分配。 Scheduler支持一种插件策略,可以将集群中资源分配给多个队列和应用。当前的Scheduler,如Capacity Scheduler和Fair Scheduler,均可视为该策略的实现。 CapacityScheduler支持hierarchical queues(分层队列),这为更准确的集群资源共享提供了可能。 Container中封装了机器资源,如内存,CPU, 磁盘,网络等,每个任务会被分配一个容器,该任务只能在该容器中执行,并使用该容器封装的资源。 ApplicationsManager负责接收作业提交,将应用程序分配给具体的ApplicationMaster,并负责重启失败的ApplicationMaster。每个应用的ApplicationMaster负责和Scheduler谈判获取资源容器,并跟踪资源容器的状态监控他们的进程。 MRv2为历史稳定版本(hadoop-1.x)提供了兼容API (API compatibility )。这意味着所有MapReduce作业不需要更改,只要重新编译一次就仍可以在MRv2上运行。

    [阅读更多...]
  • Yarn命令

    概述 yarn命令是调用的bin/yarn脚本。执行yarn命令时如果不带上参数信息则会打印yarn命令的帮助信息。 yarn命令的用法: yarn有一个命令参数解析框架,可以用来解析通用命令参数以及运行类。命令参数及描述如下表: –config confdir 覆盖默认配置目录,默认配置目录为: ${HADOOP_PREFIX}/conf。 –loglevel loglevel 覆盖默认日志级别,可选的日志级别为:FATAL,ERROR,WARN,INFO,DEBUG,以及TRACE,默认日志级别为INFO。 GENERIC_OPTIONS(通用命令参数) 一套在多种命令里面都可能会用到参数项,具体见通用命令项表 COMMAND COMMAND_OPTIONS 各种命令以及其参数,接下来主要讲述的内容。大致可以分为普通用户命令和系统管理员命令两大类。 通用项列表: 命令选项 描述 -archives <comma separated list of archives> 用逗号分隔计算中未归档的文件,仅仅针对job -conf <configuration file> 制定应用程序的配置文件 -D <property>=<value> 使用给定的属性值 -files <comma separated list of files> 用逗号分隔的文件,拷贝到Map reduce机器,仅仅针对job -jt <local> or <resourcemanager:port> 指定一个ResourceManager,仅仅针对job -libjars <comma seperated list of jars> 将用逗号分隔的jar路径包含到classpath中去,仅仅针对job 用户命令 主要是Hadoop集群普通用户常用的命令 application 主要用来打印应用信息、杀死应用进程。 用法: 参数项详情: -appId <ApplicationId> 指定要操作的应用ID -appStates <States> 和-list命令一起使用,基于应用状态来过滤应用。多个状态用逗号分隔。可用状态为:ALL, NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED -appTypes <Types> 使用-list命令,基于应用类型来过滤应用,如果应用类型有多个,用逗号分隔 -list 列出从RM返回的应用程序,使用-appTypes参数实现基于应用类型的过滤,使用-appStates参数使用基于应用状态的过滤 -kill <ApplicationId> kill指定的应用程序 -status <ApplicationId> 打印应用程序的状态 -updatePriority <Priority> 更新应用优先级,可以使用“appId”指定应用ID applicationattempt 打印application attempt的报告。 用法: 参数项详情: -help 帮助 -list <ApplicationId> 打印指定应用的application attempt列表 -status <Application Attempt Id> 打印application attempt的状态 classpath 打印获取Hadoop相关的jar以及其他必需的库所用的类路径 用法: container 打印container信息。 用法: 参数项: -help 帮助 -list <Application Attempt Id> 应用程序尝试的Containers列表 -status <ContainerId>

    [阅读更多...]
  • 关于MapReduce6 – 重构WordCount

    这里是一个更完整的WordCount实例。在这个实例中使用了很多前面提到的MapReduce框架的特性。 这个实例需要HDFS支持运行,尤其是关于DistributedCache的一些特性。因此,这个实例只能运行于伪分布式或者完全分布式安装的Hadoop上。 看下源代码: 看看输入文本文件: 查看文件的内容: 运行应用程序: 输出执行结果: 这次我们的输入内容和第一次有些不同,请注意输出结果是怎样受到影响的。 现在我们通过DistributedCache插入一个模式文件,在这个文件里,列出了一些可以被忽略统计的单词模式。 再次运行,这次我们需要在命令中添加更多的选项: 输出结果和我们期望的一样: 再运行一次,这次我们关闭大小写敏感的特性: 就这样,我们看看输出结果: 程序要点 通过使用一些Map/Reduce框架提供的功能,WordCount的第二个版本在原始版本基础上有了如下的改进: 展示了应用程序如何在Mapper (和Reducer)中通过setup方法修改配置参数; 展示了怎样使用DistributedCache分发作业需要的只读数据。在这个程序中,允许用户设定单词模式,并在统计中跳过符合模式的单词; 展示了使用GenericOptionsParser去处理常见hadoop命令行选项的功能; 展示了应用程序如何使用Counters,如何通过传递给map(和reduce) 方法的Reporter实例来设置应用程序的状态信息。 #######

    [阅读更多...]
  • 关于MapReduce5 – 一些有用的特性

    提交作业到队列 用户将作业提交到队列。队列是作业的集合,允许系统添加特定的功能,比如,队列通过ACL决定哪些用户可以提交作业。通常主要由Hadoop调度器使用队列。 Hadoop的配置信息使用了一个单独的托管队列,被称为“default”。队列的名称是由mapreduce.job.queuename定义的(Hadoop的site配置)。一些作业调度器支持多重队列,比如Capacity Scheduler。 作业设置要提交到的队列有两种方式:通过mapreduce.job.queuename属性或者通过Configuration.set(MRJobConfig.QUEUE_NAME, String) API。队列名称的设置是可选的。如果作业提交的时候没有指定队列名称,将会被提交到“default”队列。 Counters Counters是由MapReduce框架或应用程序定义的全局计数器。每个Counter都可以是任一种Enum类型。同一特定类型的Counter可以汇集到一个组,其类型为Counters.Group。 应用程序可以定义任意(Enum类型的)的Counters,并通过map或者reduce方法中的Counters.incrCounter(Enum, long)和Counters.incrCounter(String, String, long)进行更新。之后框架会汇总这些全局计数器。 DistributedCache DistributedCache可将具体应用相关的,Size大的、只读的文件有效地分布放置。 DistributedCache是MapReduce框架提供的功能,可以缓存应用程序所需的文件(文本、压缩文件、jar包等等)。 应用程序在Job中通过url(hdfs://)指定需要被缓存的文件。DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。 在slave节点上作业的所有任务执行之前,MapReduce框架会把一些必要的文档复制到相应节点。框架的这个过程运行效率是很高的,主要有两点原因:每个作业的文件只拷贝一次,可以缓存压缩文件并在slave节点上解压文件。 DistributedCache根据缓存文档修改的时间戳进行跟踪。在作业运行期间,当前应用程序或其他外部程序不能修改缓存文件。 DistributedCache可以用来分发简单的只读数据或是文本,也可以分发类型复杂的文件,比如压缩文件和jar包。压缩文件(zip, tar, tgz and tar.gz文件)会在在slave节点上解压。这些文件可以被设置操作权限。 用户可以通过mapreduce.job.cache.{files |archives}来分发文件或压缩包。如果要分发多个文件,可以使用逗号来分割文件路径。也可以使用API来设置这个属性,可以使用的API包括:Job.addCacheFile(URI)/ Job.addCacheArchive(URI) 和[Job.setCacheFiles(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)/ [Job.setCacheArchives(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)。这些方法中的URI形式是:hdfs://host:port/absolute-path\#link-name。在Streaming程序中,可以通过命令行选项-cacheFile/-cacheArchive来分发文件。 DistributedCache可以在map或reduce任务中作为一种基础的软件分发机制。他可以用来分发jar包和本地库。Job.addArchiveToClassPath(Path)或Job.addFileToClassPath(Path)这两个API可以用来缓存jar包或文件,并把它们添加到classpath和子jvm中去。同样的事情也可以通过设置配置属性mapreduce.job.classpath.{files |archives}做到。类似的,被链接到任务的工作目录的缓存文件可以用来分发和装载本地库。 私有的和公共的DistributedCache文件 DistributedCache文件可以是私有的也可以是公共的。这要看它们是如何在slave节点上被共享的。 私有的DistributedCache文件被缓存在用户私有的本地目录下,用户的作业需要使用这些缓存文件。这些文件只能被特定用户的作业或任务访问。其他用户无法访问这些文件。可以在DistributedCache文件被上传的文件系统上(通常是HDFS)将其权限设置为私有。如果文件或文件所在的目录无法被外界搜索到或访问到,这个文件就是私有的。 公共的DistributedCache文件缓存在一个全局共享目录下,并被设置为所有用户可见。这些文件可以被slave节点上的所有用户的作业和任务访问。同样,可以在文件系统将文件设置为公有。如果用户想把文件设置为公共的,那么文件的权限必须被设置为全局可读,并且文件所在的目录权限需要被设置为可执行。 Profiling Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map和reduce样例运行分析报告。 用户可以通过设定mapreduce.task.profile属性来决定是否收集作业任务的profiler信息。利用API Configuration.set(MRJobConfig.TASK_PROFILE, boolean)可以修改属性值。如果设置为true,则开启profiling功能。收集的profiler信息被保存在用户日志目录。默认profiling功能是关闭的。 如果用户设定使用profiling功能,可以使用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile的map/reduce task范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。 用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修改属性要使用api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String)。当运行task时,如果字符串包含%s。 它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling 参数如下: Debugging Map/Reduce框架能够运行用户提供的调试脚本。 当map/reduce任务失败时,用户可以通过运行调试脚本对任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)进行分析和处理。调试脚本的标准输出和错误信息会在作业UI的Diagnostic处展示。 在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。 如何分发脚本文件: 用户要用 DistributedCache 来分发和链接脚本文件 如何提交脚本: 一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置 “mapreduce.map.debug.script” 和 “mapreduce.reduce.debug.script” 属性的值。这些属性也可以通过 Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)和 Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String) API来设置。对于streaming, 可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。 脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行的调试命令: Pipes 程序根据第五个参数获得c++程序名。 因此调试pipes程序的命令是 默认行为: 对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。 数据压缩 Hadoop MapReduce框架为应用程序的写入文件操作提供了压缩工具,这些工具可以为map输出的中间数据和作业结果输出数据(例如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现,比如实现了 zlib压缩算法。 Hadoop同样支持gzip、bzip2 、snappy和 lz4等文件格式。 考虑到性能问题(zlib)以及Java类库的缺失等因素,Hadoop也为上述压缩解压算法提供本地库的实现。 中间输出 应用程序可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api控制map输出的中间结果,并且可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api指定 CompressionCodec。 结果输出 应用程序可以通过 api方法FileOutputFormat.setCompressOutput(Job, boolean) 控制输出是否需要压缩,并且可以使用 FileOutputFormat.setOutputCompressorClass(Job, Class)指定CompressionCodec。 如果作业输出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api,来设定 SequenceFile.CompressionType (i.e. RECORD

    [阅读更多...]
  • 关于MapReduce4 – 作业的输入输出

    作业的输入 InputFormat InputFormat描述了MapReduce作业的输入规范。 MapReduce框架根据MapReduce作业的InputFormat来做这些事情: 校验作业的输入配置; 把输入文件切分成多个逻辑上的InputSplit实例,并把每个实例分发给一个Mapper; 提供RecordReader实现类,从逻辑InputSplit中收集输入记录,并将这些记录交给Mapper处理。 基于文件的InputFormat的实现类(通常是FileInputFormat的子类)的默认行为是按照输入文件的字节(byte)大小,将输入切分成逻辑InputSplit实例。然而,输入文件的FileSystem的块的大小是InputSplit分块大小的上限。InputSplit分块大小的下限可以通过mapreduce.input.fileinputformat.split.min来设置。 考虑到边界情况,对于很多应用程序来说,按照输入文件大小进行切分是不能满足需求的。在这种情况下,应用程序需要实现一个RecordReader类来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。 TextInputFormat是默认的InputFormat。 如果一个作业的InputFormat是TextInputFormat,并且框架检测到输入文件的扩展名是.gz就会使用对应的CompressionCodec自动解压这些文件。但是需要注意,带上述扩展名的压缩文件不会被分割,并且整个压缩文件会分给一个mapper来处理。 InputSplit InputSplit是每个mapper要处理的数据块。 通常InputSplit是字节样式的输入,然后由RecordReader处理并转化成记录样式。 FileSplit是默认的InputSplit。它把mapreduce.map.input.file设置为输入文件的路径,输入文件是逻辑分块文件。 RecordReader RecordReader从InputSplit读入 <key, value>对。 通常RecordReader将InputSplit提供的字节样式的输入转化为由mapper处理的记录样式的文件。因此RecordReader负责处理记录的边界情况,以及把数据表示成<key, value>对形式。 作业的输出 OutputFormat OutputFormat描述了作业的输出样式。 MapReduce框架依赖OutputFormat来做这些事情: 检查作业的输出,例如检查输出路径是否已存在; 提供RecordWriter实现类以输出作业结果。输出文件保存在FileSystem上。 TextOutputFormat是默认的OutputFormat。 OutputCommitter OutputCommitter描述了MapReduce作业输出任务的提交。 MapReduce框架依赖OutputCommitter来做这些事情: 在初始化时对作业进行设置。比如在作业初始化的时候创建作业的临时输出目录。在作业初始化任务之后,作业的设置由另一个不同的的任务完成,此时作业的状态为PREP。当作业设置任务执行完成时,作业的状态将会改变为RUNNING。 在作业完成后执行清理工作。比如在作业执行完成后删除临时输出目录。作业清理工作在作业执行的末期由一个独立的任务完成。在作业清理任务完成后,作业的状态是:SUCCEDED/FAILED/KILLED。 设置任务临时输出。在任务初始化阶段,任务的设置是相同任务的一部分工作。 检查任务是否需要提交。这样做是为了避免任务在不需要提交时就被提交了。 提交任务的输出。在任务执行完成时,如果有需要,任务会提交它的输出。 放弃任务的提交。如果任务已经失败或者被杀死,输出内容将会被清理。如果任务不能自行完成清理工作(比如被异常阻塞)。另一个有相同尝试ID的任务将会被调用去完成清理工作。 FileOutputCommitter是默认的OutputCommitter。作业的设置和清理工作占用了Map或者Reduce的容器(哪个在NodeManager上可用就用哪个)。作业清理任务(JobCleanup)、任务清理任务(TaskCleanup)和作业设置任务(JobSetup)拥有最高的优先级,这三者之间的优先级同提到的顺序。 任务的Side-Effect Files 在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果文件不同。 在这种情况下,同一个Mapper或Reducer的两个实例(比如预防性任务)同时打开或者写入文件系统上的同一个文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(而不只是每次任务)选取一个独一无二的文件名(使用attemptid,比如attempt_200709221812_0001_m_000000_0)。 为了避免这些冲突,在OutputCommitter是FileOutputCommitter时,MapReduce框架为每次尝试任务都建立维护了一个特殊的子目录${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid},这个目录位于尝试任务输出结果存放的FileSystem上,可以通过${mapreduce.task.output.dir}来访问这个目录。对于成功完成的尝试任务,只有${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}下的文件会被移动到${mapreduce.output.fileoutputformat.outputdir}。当然了,框架会丢弃那些失败的尝试任务的子目录。对于应用程序来说,这个过程是完全透明的。 在任务执行期间,应用程序在写文件时可以利用这个特性,比如通过FileOutputFormat.getWorkOutputPath(Conext)在${mapreduce.task.output.dir}获得${mapreduce.task.output.dir}目录,并在其下创建任意需要的side-file。框架在任务尝试成功时会马上移动这些文件,因此不需要在程序中为每次任务尝试取一个独一无二的名字。 注意:在每次任务尝试执行期间,${mapreduce.task.output.dir}的值实际上是${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},这个值是由MapReduce框架设定的。所以使用这个特性的方法是在MapReduce的FileOutputFormat.getWorkOutputPath(Conext)方法返回的路径中创建side-file即可。 对于只使用map而不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。 RecordWriter RecordWriter将作业输出的kye-value对写入到输出文件。 RecordWriter的实现类将作业的输出写入到文件系统。 #############

    [阅读更多...]
  • 关于MapReduce3 – 作业的配置和执行

    配置 一个Job就表示了一个MapReduce的作业配置。 Job是用户向Hadoop框架描述一个MapReduce作业如何执行的最主要的接口。框架会尽力按Job的描述去忠实地执行一个作业,但是: 一些配置参数可能会被管理员标记为final(了解下final参数),这意味着不能被更改; 作业的一些参数可以被直接设置(比如Job.setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间存在微妙的联系,设置起来比较复杂(比如Configuration.set(JobContext.NUM_MAPS, int))。 Job通常是用来指定Mapper,Combiner(如果有的话),Partitioner,Reducer,InputFormat,OutputFormat的实现类。Job可以指定一组输入文件,使用的方法包括(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))。Job还可以指定结果输出文件的地址(FileOutputFormat.setOutputPath(Path))。 此外,Job还可以用来为作业做一些高级的配置。比如使用哪个Comparator,哪些文件可以被放进DistributedCache,中间结果或者作业的输出结果是否可以被压缩以及怎样压缩,作业是否允许预防性任务的执行(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)),每个任务的最大尝试次数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等等。 当然用户可以使用Configuration.set(String, String)/ Configuration.get(String)来set/get应用程序需要的任意参数。然而,有些参数需要慎重使用,比如DistributedCache更适用于大规模的只读数据。 执行 MRAppMaster是在一个单独的jvm上以子进程的形式来执行Mapper/Reducer的任务的。 子任务会继承父MRAppMaster的执行环境。当然,用户也可以自行设置子jvm上的附加选项。这需要使用mapreduce.{map|reduce}.java.opts和Job中的配置参数(比如使用-Djava.library.path=<>将一个非标准路径设为运行时的链接用以搜索共享库)。如果mapreduce.{map|reduce}.java.opts的参数包含“@taskid@”标识符,它会被替换成MapReduce中taskid的值。 下面是一个包含多个参数和替换的例子,其中包括:jvm GC日志;无密码启动JVM JMX代理程序以使之可以连接到jconsole或者类似的工具来查看子进程的内存、线程并得到线程的dump;分别设置map和reduce的子jvm的堆内存为512MB和1024MB;还添加了一个附加路径到子jvm的java.library.path中。   内存管理 用户或者管理员可以使用mapreduce.{map|reduce}.memory.mb设置子任务以及其子进程的虚拟内存的最大值。注意,这里设置的值是对每一个进程都有效的。mapreduce.{map|reduce}.memory.mb设置的值的单位是MB。还有,这个值必须大于或等于java虚拟机的-Xmx的值,否则虚拟机可能无法启动。 注意:mapreduce.{map|reduce}.java.opts只用于设置MRAppMaster启动的子任务。为守护进程设置内存的选项请参看文档《Hadoop守护进程环境配置》 框架一些部分的可用内存也是可配置的。在map和reduce任务中,程序的性能可能会被一些参数的调整所影响——比如是并发相关选项或者将数据写入硬盘的频率。监控文件系统关系到一个作业的计数器——尤其是关系到从map到reduce的byte数——是至关重要的。这些数据对于协调我们刚才提到的参数有着非常宝贵的价值。 Map参数 一个map所产生的某条记录会在被序列化后置入缓存,相关的元数据则会被存放入accounting buffer中。就像下面这些选项所描述的,若是缓存中序列化数据的大小或者元数据的大小超过了某个阈值,而此时map还在持续输出记录,缓存中的内容就会被排序并写入硬盘生成一个临时文件,这个过程可以称为“溢写(spill)”。如果缓存已被填满并且产生溢写,此时map线程就会被阻塞。在map任务执行完成后,缓存中现存的所有的记录都会被写入硬盘,而后硬盘上所有相关的临时文件会被合并成一个独立的文件。较小的溢写可以减少map执行的时间,但是一个较大的缓存又会占用Mapper的可用内存。 名称 类型 描述 mapreduce.task.io.sort.mb int 累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB mapreduce.map.sort.spill.percent float 排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘 名称 类型 描述 mapreduce.task.io.sort.mb int 累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB mapreduce.map.sort.spill.percent float 排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘 其他需要注意的内容: 如果超出了溢写阈值而还有一个溢写正在执行,那么将只做收集工作直到当前的溢写完成。举个例子:如果mapreduce.map.sort.spill.percent的值被设置为0.33,缓存的数据已经达到了阈值但此时仍有溢写在执行,那么下一次的溢写会一起处理这次收集的数据(即是缓存中0.66的内容),而不是产生额外的溢写。换句话说,这里定义的阈值是触发性的,不是阻塞性的。 如果一条记录的大小超过了排序缓存区的的容量,那么这将会先触发一个溢写,这条记录将会被溢写到一个单独的文件。而这条记录是否会被combiner优先处理则是不确定的。 Shuffle/Reduce参数 就像之前所描述的,每个reduce会取得Partitioner通过和HTTP分配给它的map的输出并且分阶段的将这些输出合并到硬盘上。如果map输出中间集的压缩设置被开启的话,那么map每一次的输出都会解压到内存。下面的这些选项影响到reduce处理前将输出合并到硬盘的频率以及在reduce处理时分配给map输出的内存大小。 名称 类型 描述 mapreduce.task.io.soft.factor int 指定在硬盘上每次合并的临时文件的数目。它限制了合并时打开文件的数目以及压缩编码方式。如果文件的数目超过了这个限制,合并的工作将会提交给几个不同的进程。虽然这个限制也适用于map,但是大部分作业最好是做下设置,这样这个限制将不可能达到。 mapreduce.reduce.merge.inmem.thresholds int 在被合并写入硬盘前,被抓取到内存中的排好序的map输出记录的总数。就像之前提到的溢写阈值,这个参数也不是被定义为一个切分单元,而是一个触发值。在实践中,这个值通常被设置的非常高(1000)或者被禁用(0),因为在内存中对片段的合并往往要比在硬盘上的合并开销低得多(查看下文的注意事项)。这个阈值只是影响了shuffle阶段内存中进行文件合并的频率。 mapreduce.reduce.shuffle.merge.percent float 在内存文件合并前放置到内存中的map输出的阈值,这个值表示了内存中用于存储map输出的百分比。因为map的输出若无法装入内存就会被停滞,这个值设置过高的话又会降低获取和合并的平行度。相反的,这个值达到1.0时对于那些输入可以完全放入内存的reduce是有效的。这个参数只是影响了shuffle阶段内存中进行文件合并的频率。 mapreduce.reduce.shuffle.input.buffer.percent float 在shuffle阶段分配给map输出缓存的堆内存百分比——相对于通常在mapreduce.reduce.java.opts中设置的堆内存的大小。尽管还需要留一些内存给框架,通常把这个值设置的足够大还是很有用的,这样可以存储以存储更大更多的map输出,减少溢写的内容。 mapreduce.reduce.input.buffer.percent float 在reduce阶段用于存储map输出的最大堆内存的百分比。在reduce开始时缓存中的map输出内容将会被合并到硬盘直到剩下的内容小于这个值。默认情况下,所有的map输出都会被合并到硬盘,以为reduce留出足够大的内存空间。对于内存需求比较小的reduce,应当适当的增加这个值,以避免写入硬盘从而提升性能。 其他需要注意的内容: 如果一条map输出大于用于复制map输出的内存的25%,它将会被直接写入到硬盘。 当使用combiner的时候,关于使用高合并阈值以及大缓存的说法就不太可靠了。因为合并发生在所有的map输出被获取前,而combiner是在溢写到硬盘的时候运行。在一些案例中,用户通过将资源用于对map输出的combin而不是简单增加缓存的大 小可以优化reduce的执行时间。使用combier有助于减小溢写,实现溢写和获取的并行处理。 在合并内存中的map输出到硬盘并启动reduce时,如果有碎片要溢写而硬盘上至少有mapreduce.task.io.sort.factor个临时文件,此时需要进行一次中间合并。仍在内存中的map输出也会成为中间合并的一部分。 配置参数 下面的属性是每个task执行时使用的本地作业参数: 名称 类型 描述 mapreduce.job.id String 作业ID mapreduce.job.jar String 在job目录中job.jar的位置 mapreduce.job.local.dir String job指定的共享存储空间 mapreduce.task.id String task id mapreduce.task.attempt.id String task尝试ID mapreduce.task.is.map boolean 是否是map task mapreduce.task.partition int task在job中使用的id mapreduce.map.input.file String map读取的文件名 mapreduce.map.input.start long map输入的数据块的起始位置偏移量 mapreduce.map.input.length long map输入的数据块的字节数 mapreduce.task.output.dir String

    [阅读更多...]
  • 关于MapReduce2 – Job主体

    这一部分内容会适当深入说明用户即将面对的MapReduce框架的各个环节。这有助于用户从一个更细的粒度地去实现、配置、调优作业。 我们先看看Mapper和Reducer接口。通常应用程序实现这两个接口需要提供map和reduce方法。 然后我们会讨论下其他的核心接口,包括Job,Partitioner,InputFormat,OutputFormat等接口。 最后,我们将通过讨论MapReduce框架一些有用的功能点比如DistributedCache,IsolationRunner等等来收尾。 Mapper Mapper将输入的key-alue对集合处理成另外一组key-value对集合。得到的新的key-value对集合只是一个用来过渡的中间记录集。 Map是一类将输入的记录处理为中间记录集的独立任务。生成的中间记录集不需要和输入记录类型一致。一组输入记录经过Map处理,可能会生成0个或多个输出key-value对。 Hadoop MapReduce框架为每个InputSplit创建一个map任务,而每个InputSplit是由作业的InputFormat产生的。 概括地说,作业通过Job.setMapperClass(Class)方法调用Mapper的实现类。然后MapReduce框架就可以在任务中调用map方法来处理InputSplit中每个的key-value对。应用程序可以通过重写cleanup方法来执行必要的清理工作。 输出的key-value集合不必要和输入的key-value集合类型一致。一个输入的key-value对经过处理可能输出0个或多个输出key-value对。通过调用context.write()方法可以收集输出的key-value对。 应用程序可以使用Counter(计数器)来汇报统计信息。 MapReduce框架随后会把一个特定的key所关联的所有中间记录进行分组,并提交给Reducer去处理生成最终结果。用户可以通过Job.setGroupingComparatorClass(Class)来指定一个Comparator来控制分组。 Mapper的输出结果排好序后就被分割开分配给每个Reducer去处理。分割的数目和作业中reduce的任务数目是一致的。用户可以通过实现自定义的Partitioner来控制哪个key(或之后的结果集)被分配给哪个Reducer。 用户也可以选择使用一个combiner来对产生的中间结果集进行一次本地的聚合操作(或者说是本地的reduce操作),这有助于减少Mapper传递给Reducer的数据量。这里需要使用到Job.setCombinerClass()方法。 这些排好序的中间过程的输出结果的保存格式是(key-len, key, value-len, value)。程序可以在Configuration中使用CompressionCodec来控制是否压缩以及怎样压缩输出的中间记录集。 需要多少个Map? map的数目通常是由输入记录的大小决定的。一般就是输入文件的总块(block)数。 map正常的并行规模大致是每个节点10-100个map。如果map任务对cpu的消耗较小的话,那每个节点可以设置多达300个map。由于每个任务的初始化需要占用一定的时间,所以比较合理的情况是每个map的执行时间最少1分钟。 因此,如果你有10TB的输入数据,每个块的规模是128MB,那么最终会需要82000个map来完成任务。除非使用了Configuration.set(MRJobConfig.NUM_MAPS, int)(这也只不过是给了MapReduce框架一个提示)来将这个数值设置的更高。 Reducer Reducer将一组有相同key的中间记录集处理为一组更小的结果集。 用户可以通过Job.setNumReduceTasks(int)来控制作业中reduce的数量。 概括地说,在作业中,Job对象可以用通过Job.setReducerClass(Class)来调用Reducer的实现类,并完成对其的初始化。然后MapReduce框架就可以调用reduce()方法来处理每个已分组好的输入键值对<key, (list of values)>。此时应用程序也可以重写cleanup(Context)来执行必要的清理。 Reducer有三个主要的阶段:shuffle,sort和reduce。 Shuffle Reducer的输入就是Mapper已经排好序的输出。在这个阶段,MapReduce框架通过HTTP为Reducer获取所有Mapper输出中与之相关的块。 Sort 在这个阶段中,框架会对Reducer的输入按key进行分组(因为不同mapper的输出中可能会有相同的key)。 Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取出一边被合并的。 Secondary Sort 如果需要中间记录集的分组规则和提交reduce前对key进行分组的规则不一致,那么可以通过Job.setSortComparatorClass(Class)指定一个Comparator。再加上Job.setGroupingComparatorClass(Class)可以被用来控制中间记录集如何分组,因此结合两者可以实现按值的二次排序。 Reduce 在这一阶段,框架为已分好组的每个<key, (list of values)>对调用reduce()方法。 reduce任务的输出通常会通过Context.write()写入文件系统。 应用程序可以通过Counter来汇报统计信息。 Reducer的输出是没有经过排序的。 需要多少个Reduce? reduce的适合数量大致等于 (0.95或1.75) *节点数*每个节点最大的Container数。 使用0.95的话,所有的reduce任务会在所有map任务执行完毕后立即执行,开始传输map的输出结果。使用1.75的话,较快的节点在完成第一轮reduce后就开始第二轮的reduce任务,这样可以达到一个比较好的负载均衡的效果。 增加reduce的数量会增加MapReduce框架的开销,但是可以改善负载均衡,降低因为执行失败带来的负面影响。 上述的比例系数比整体数目稍小一些是为了给框架中的推测任务(speculative-tasks)和失败的任务保留部分的reduce空间。 不使用Reducer 在没有reduce需求的时候,将reduce任务数目设置为0也是合理的。 在这种情况下,map任务的输出结果会被直接存入文件系统中。存放的路径FileOutputFormat.setOutputPath(Job, Path)指定。同样的,在存放入文件系统前,框架也没有对map的输出进行排序。 Partitioner Partitioner用以分隔键值空间。 Partitioner负责控制对map输出结果的键值进行分割。key(或者一个key的子集)用于产生分区,通常这个过程基于hash函数实现。分区的数目和作业中reduce任务的数目一致。因此它控制着将中间记录的key交给哪个reduce任务去处理。 默认的Partitioner是HashPartitioner。 Counter Counter是MapReduce用于汇报统计信息的一套机制。 Mapper和Reducer的实现类可以使用Counter来汇报一些统计信息。 Hadoop MapReduce框架自带了一套包含许多有用的Mapper、Reducer和Partitioner的类库。

    [阅读更多...]
  • kafka简介

    简介 kafka是一个分布式的、可分区的、可复制的日志提交服务。它提供了消息传递的功能,但是有着独特的设计。 首先,先了解一些基础概念: Kafka将消息源的分类称为topic; 向Kafka的topic发送消息的进程被称为producer; 订阅并消费消息的进程被称为consumer; Kafka运行在由一个或多个服务组成的集群上,其中的每个服务被称为broker。 从整体上来看,producer负责通过网络将消息发送给Kafka集群,Kafka集群将消息提供给consumer,过程如下图: 客户端和服务器之间通过TCP协议进行通信。Kafka提供了java的客户端。但是客户端也可以用多种语言实现。 1. topic和日志 我们先来研究一下Kafka的一个抽象概念——topic。一个topic是一组消息源的分类或订阅名称。针对每个topic,Kafka集群都维护了一组分区日志(partition)。分区日志即是一个topic的一个partition对应的所有segment文件。如下图: 每个topic都是由一系列有序的,不可变的消息组成。这些消息被不断的追加到分区中。分区中的每个消息都被分配了一个序列ID号。这个序列ID号被称为offset,可以作为在分区中消息的唯一标识。 在一个可配置的时间段内,Kafka集群会保存所有已发布的消息——不管它们是否已被消费。比如说如果日志的保留期被设置为两天,那么在消息发布后的两天内它都是可消费的。两天后,消息会被清除以释放空间。数据规模大小并不影响Kafka的性能,保存大量的数据对Kafka来说并不是个问题。 实际上,每个consumer唯一需要维护的数据就是consumer在日志中消费到的位置,也就是offset。offset是由consumer来维护的。一般情况下,consumer每读取一条消息,offset的值就向前增加一次。但是事实上consumer可以自由控制offset信息,它可以按任意的顺序读取消息。比如,consumer可以将offset的值重置为一个比较旧的位置来进行重新处理。 这些特性意味着kafka的consumer可以非常灵活——它们可以随意进出而不影响集群或者其它的consumer。举例说,用户可以在命令行中使用tail命令来追踪任意topic的内容,而不用担心影响正在访问topic的consumer。 对日志进行分区有这样几点作用: 首先,可以使日志的规模保持在一个指定的范围内,以能保存到单独的服务上。每个独立的分区必须能够适配它所在的服务节点。一个topic可以有多个分区,从而可以承载任意规模的数据。 第二,可以作为并行处理的单元(这点稍后讨论)。 2. 分布式 日志的分区分布式地保存在kafka集群的服务节点上。这样每个服务节点都可以请求数据管理数据。每个分区在一定数量的服务节点上都有副本以进行容错,并且这个数量是可配置的。 对于每个分区,都有一个服务节点扮演着leader的角色,其他的零个或者多个服务扮演着follower的角色。leader负责处理分区的读写请求,而follower只是被动地维护leader的副本。一旦leader出现了故障,一个follower就会自动成为新的leader。每个服务节点都可能同时扮演leader和follower两种角色。即一个服务节点可能是它上面一些分区的leader,同时也可能是其他分区的follower。就是这样kafka在集群内部实现了负载均衡。 3. Producer producer负责发布数据到其所选择的topic中,并选择将数据分配给topic的哪个分区。分区选择可以简单地由负载均衡以轮流制的方式实现,也可以通过一些特定的分区函数(取决于消息中的一些key)实现。通常使用的是第二种方式。 4. Comsumer 消息发布通常有两种模式:队列模式和发布订阅模式。在队列模式中,多个consumer(这里有个consumer pool的概念)从一个服务读取消息,每个消息只能被其中的一个consumer读到。在发布订阅模式中,消息会广播给所有的consumer。kafka提供了一个抽象的consumer概念即consumer group,从而同时实现了这两种方案。 每个consumer可以加入一个consumer group。consumer group订阅发布到topic的消息,而后将消息传送给其下面的一个consumer实例。所有的consumer实例可以运行在不同的进程上,也可以在不同的独立的机器上。 如果所有的consumer实例属于同一个consumer group,这样的工作模式在consumer中实现了负载均衡,类似常规的消息队列模式。 如果所有的consumer实例分别属于不同的consumer group,每个消息会被广播给所有的consumer,这样的模式类似于发布订阅模式。 不过更常见的是,每个topic都会有若干consumer group,一个consumer group就是一个逻辑上的“订阅者”。每个consumer group都是由多个consumer实例组成,从而获得了更好的稳定性和容错能力。这其实也是一个发布订阅模式,只不过订阅者不只是一个进程,而是由多个consumer组成的集群。 上图展示了一个kafka集群,其中包含两台服务器、四个分区(P0~P3)、以及两个consumer group。其中group A有两个consumer实例,B有四个。 相比常规的消息系统,kafka可以更好的保证有序性。 常规的队列模式在服务器上有序的保存消息。当多个consumer需要从队列中获取消息的时候,服务器会按照消息保存的顺序将消息取出。然而,尽管服务器是按顺序将消息取出的,消息却是异步地被分发至consumer的,当消息到达不同的consumer的时候可能已经失去了顺序。 这意味着在并发消费时将导致消息的顺序错乱。为了解决这个问题,消息系统通常会采用“exclusive consumer(独有消费)”的概念,即只允许一个进程从队列读取消息,当然这就意味着失去了并发性。 kafka提供了一个很好的解决方案。通过topic内部的分区概念,在处理多个consumer并发时,kafka可以实现有序性和负载均衡。 kafka将topic的每个分区只分发给一个consumer group,  这样一个分区就只能被这个group中的一个consumer消费,并且可以保证消费的顺序性。因为有很多分区,所以仍然可以在多个consumer实例中实现负载均衡。请注意consumer实例的数量不应该比分区的数量多。 kafka只是在一个分区内提供了消息的有序性,而在一个topic不同的分区间是不能保证有序性的。然而对于大部分的应用程序来说这已经足够了。如果要保证topic中所有消息的有序性,那么只能让一个topic中只有一个分区,当然这也意味着只能有一个消费者进程。 5. 保证 在某个层面上 ,kafka提供了如下保证: kafka会保证producer发给topic指定分区的消息按照发送顺序追加到日志中。即是说,如果消息M1和M2先后被同一个producer发送,如果M1先发送,那么在分区日志中M1的offset要比M2小,并且更早出现在分区日志中; kafka可以保证consumer消费消息的顺序和消息在分区日志中存储的顺序一致; 如果一个topic的“replication factor”是N,那么Kafka可以保证在N-1个服务器失效后仍不丢失提交给分区的消息。 更多细节可以在文档的“设计原理”这一章查看。 使用场景 这里列出了Kafka的几种常见使用场景。如果想对kafka在实战中的使用有个概括的了解,可以看一下这篇博客。 1. 消息处理 kafka可以作为常规的MessageBroker。MessageBroker有多种用途(将数据从producer方解耦,缓存未处理的消息等等)。相比大多数消息系统,kafka有更好的吞吐能力、内置的分区机制、冗余备份机制、以及不错的容错能力,因此kafka适用于大部分消息处理需求。 在我们的经验中,一般消息处理的吞吐量相对不大,但是要求较低的端到端延迟,并且需要kafka提供的良好的稳定性保障。 在消息处理这个领域,kafka可以比得上常规的消息系统,比如ActiveMQ和RabbitMQ。 2. 网站活动跟踪 kafka最早的用处就是作为一个实时发布订阅系统来构建用户行为追踪管道。这意味着网站上的活动(网页浏览、搜索以及其他用户行为)都会被发布到中央topic集合,每个topic代表了一种行为类型。收集的这些数据可以在订阅后支持多种用途比如实时处理、实时监控、加载到hadoop系统或者离线数据仓库系统作离线处理或者生成报表。 活动追踪通常需要很大的空间容量,因为用户每访问一个网页都会产生大量的活动数据。 3. Metrics(运营评估) kafka还经常会被用来处理运营监控数据。这涉及到聚合多个分布式应用上的数据来生成运营分析数据。 4. 日志聚合 很多人使用kafka作为日志聚合解决方案。日志聚合通常就是从各个服务器收集物理日志文件并将之集中到一个地方(比如文件服务器或者HDFS)去做处理。kafka并不关注文件的细节,它提供了一个更简洁的消息数据流的抽象概念来处理日志或者事件数据。这就实现了低延迟处理、简单的多数据源支持以及分布式数据消费。相比一些日志收集系统比如Scribe或者Flume,kafka有着不错的性能,更低的端到端延迟,并通过冗余复制提供了更健壮的持久性保障。 5. 流处理 用户经常需要对数据做一些阶段性的处理:从topic获取原始数据,而后经过汇总、丰富或者以其他方式生成新的topic为进一步的数据消费做准备。举个例子:一个文章推荐工作流程首先需要从RSS订阅源抓取文章内容并发布为一个名为“article”的topic;而后对从“article”中获取的所有文章进行整理并作去重处理后发布为新的topic,最后的工作就是尝试进行内容匹配并推荐给用户。这里描述了一个实时数据处理流程图。Storm 和 Samza 是两个用来做这些工作的常见框架。 6. Event Sourcing(事件溯源) 事件溯源是一种软件设计模式。在这种模式下,每一次数据状态的变化都会被记录到一个时间顺序的记录中。kafka支持海量数据的特性为这种设计模式提供了一个不错的选择。 7. Commit Log 对于分布式系统,kafka可以被用于提供额外的日志提交服务。日志可以被用来在节点间复制数据,也可以作为重新同步机制以在节点故障后恢复数据。kafka的日志压缩特性支持了这方面的应用。在这一点上,kafka和Apache BookKeeper项目有些类似。 三. 生态圈 在kafka的主项目外,还有大量的支持工具。在ecosystem page上,列出了这些工具,包括流处理系统、hadoop聚合接口、监控工具、部署工具等。

    [阅读更多...]