概述 Hadoop Archive是Hadoop官方提供的解决HDFS上小文件过多的一种方案。可以通过如下命令来执行生成har文件: 执行archive命令会提交一个MapReduce任务来生成har文件。在了解har文件结构后也可以考虑本地生成har文件再上传。 关于“hadoop archive”指令的更多细节请参考官方文档 。 HAR文件结构 HAR文件实际上是一个以”.har”结尾命名的目录,其中至少包含三个文件: _index _masterindex part-00000 … 其中“_index”文件中存储包内目录、文件的元数据信息,并按路径java字符串hashCode()运算的哈希值排序。 “_masterindex”记录了“_index”文件中每1000条文件元数据信息的起止哈希值、以及其在“_index”文件中的起止位置。 “part-X”文件中直接拼接了原始文件内容,无压缩处理,每个map操作的节点生成一个“part-X”文件。 HAR文件读取 在平时工作中读取hdfs文件有三种形式:即在java代码中通过hadoop-client形式读取,执行spark任务读取,通过webhdfs的rest api读取。下面一一介绍下相关的的解决方案。 通过java-client方式读取 假设我们有一个har归档文件,其存储目录是:/har/hp2.har。 在这个归档文件下存在一个目录hp,存储了7个txt文件: 下面通过hadoop-client编写程序列出“har:///har/hp2.har/hp/”下的全部文件: 上面是用scala代码做的实现。 比照普通的HDFS文件的访问方式,访问Har文件的主要特点在于其FileSystem操作对象是一个HarFileSystem的实例。 对HarFileSystem实例执行initialize()操作的时候需要传入要访问的har文件的根路径,其后所有的操作都是对har子项的相对路径进行操作。 注意:执行initialize()操作时只能传入har文件的根路径,不能像执行上面的“hfs -ls har:///har/hp2.har/hp/”指令一样传入一个完整的har子项的路径。 通过spark任务读取 spark读取数据文件大致有三种形式: 通过SparkSession.read.textFile读取文件创建DataSet 通过SparkSession.read.text读取文件创建DataFrame 通过SparkContext.textFile读取文件创建RDD 针对这三种形式,我分别写了一段代码进行验证: 在执行日志中输出的验证信息如下: 根据执行结果可知,通过SparkSession读取的两种方案均不可行,只有通过SparkContext进行读取才能达到预期效果。 通过webhdfs方式读取 关于直接通过webhdfs方式读取har中内容的方式,我查了些资料,包括Hadoop官方文档,StackOverflow的相关话题以及Google检索出的条目,其中勉强可行的是如下两篇文章建议的方案: Downloading a file inside a hadoop archive using Apache Knox Hadoop WebHDFS usage in combination with HAR (hadoop archive) from PHP 其实现思路大致如下: 通过webhdfs获取har的index文件 在index文件中找到在HDFS中存储目标文件的数据文件,以及目标文件在数据文件中的起始offset及长度 通过webhdfs获取目标文件:http://<hadoop-server>:50070/webhdfs/v1/data-file-path?op=OPEN&offset=$offset&length=$len 扫描har index文件并截取内容不太像是一种优雅的做法,至少我并不喜欢。 简单介绍一种替代方案,即通过hadoop-client实现数据流拷贝,下面是代码实现: 在api接口中调用copy方法即可实现下载功能。代码稍稍有些多,但窃以为还是要比扫描index好一些。
[阅读更多...]-
Hadoop HAR文件的读取操作
-
通过HA访问Hdfs获取ActiveNode
通过HA访问Hdfs的时候如何获取到活跃节点是一个稍稍有些麻烦的事情。 目前使用过两种方案:一是通过webhdfs接口逐一访问测试,找到状态为可用的节点;一是在zookeeper上直接获取当前活跃的节点。 简单说下第二种方案。ha的ActiveNode在zookeeper上的存储节点为:/hadoop-ha/dcnameservice/ActiveStandbyElectorLock。只需要通过ZooKeeper的API监听获取这个节点的信息即可。不过这个节点保存的信息不能当做字符串来读取,它是一个序列化后的对象,需要反序列化才能使用。 实现的代码如下: 就这样。
[阅读更多...] -
Yarn架构设计
yarn一个基本理念就是将JobTracker的两大主要功能——资源管理和作业调度/监控——分割开来成为两个独立的守护进程。在这个方案里,有一个全局的ResourceManager (RM)和对应每个应用的ApplicationMaster (AM)。一个应用可以是一个独立作业,也可以是多个作业构成的DAG(有向无环图,Directed Acyclic Graph)。 ResourceManager和NodeManager(NM)一起构成了整个数据计算框架。ResourceManager是系统上全部应用所需资源的最终决策者。NodeManager则是每台机器上的框架代理,负责这台机器上的所有容器。NodeManager监控一台机器上的所有容器对资源(CPU、内存、硬盘、网络)的使用,并将之上报给ResourceManager和Scheduler。 每个应用的ApplicationMaster实际上是一个具体的框架库。它的任务是负责与ResourceManager协商获取资源,以及与NodeManager合作实现task的执行与监控。 ResourceManager有两个主要组件:Scheduler(调度器)和ApplicationManager。 Scheduler根据容量、队列等常见的约束,将系统资源分配给正在运行的各种应用。这里的Scheduler是一个纯粹意义上的调度器:它不再对应用的状态进行监控和跟踪,也不保证重启因应用错误或硬件故障导致失败的任务。Scheduler基于应用对资源的需求执行调度功能。这是基于一个抽象的概念“资源容器(Container)”实现的。资源容器将内存、CPU、硬盘和网络等元素组合在一起支持对任务的资源分配。 Scheduler支持一种插件策略,可以将集群中资源分配给多个队列和应用。当前的Scheduler,如Capacity Scheduler和Fair Scheduler,均可视为该策略的实现。 CapacityScheduler支持hierarchical queues(分层队列),这为更准确的集群资源共享提供了可能。 Container中封装了机器资源,如内存,CPU, 磁盘,网络等,每个任务会被分配一个容器,该任务只能在该容器中执行,并使用该容器封装的资源。 ApplicationsManager负责接收作业提交,将应用程序分配给具体的ApplicationMaster,并负责重启失败的ApplicationMaster。每个应用的ApplicationMaster负责和Scheduler谈判获取资源容器,并跟踪资源容器的状态监控他们的进程。 MRv2为历史稳定版本(hadoop-1.x)提供了兼容API (API compatibility )。这意味着所有MapReduce作业不需要更改,只要重新编译一次就仍可以在MRv2上运行。
[阅读更多...] -
Yarn命令
概述 yarn命令是调用的bin/yarn脚本。执行yarn命令时如果不带上参数信息则会打印yarn命令的帮助信息。 yarn命令的用法: yarn有一个命令参数解析框架,可以用来解析通用命令参数以及运行类。命令参数及描述如下表: –config confdir 覆盖默认配置目录,默认配置目录为: ${HADOOP_PREFIX}/conf。 –loglevel loglevel 覆盖默认日志级别,可选的日志级别为:FATAL,ERROR,WARN,INFO,DEBUG,以及TRACE,默认日志级别为INFO。 GENERIC_OPTIONS(通用命令参数) 一套在多种命令里面都可能会用到参数项,具体见通用命令项表 COMMAND COMMAND_OPTIONS 各种命令以及其参数,接下来主要讲述的内容。大致可以分为普通用户命令和系统管理员命令两大类。 通用项列表: 命令选项 描述 -archives <comma separated list of archives> 用逗号分隔计算中未归档的文件,仅仅针对job -conf <configuration file> 制定应用程序的配置文件 -D <property>=<value> 使用给定的属性值 -files <comma separated list of files> 用逗号分隔的文件,拷贝到Map reduce机器,仅仅针对job -jt <local> or <resourcemanager:port> 指定一个ResourceManager,仅仅针对job -libjars <comma seperated list of jars> 将用逗号分隔的jar路径包含到classpath中去,仅仅针对job 用户命令 主要是Hadoop集群普通用户常用的命令 application 主要用来打印应用信息、杀死应用进程。 用法: 参数项详情: -appId <ApplicationId> 指定要操作的应用ID -appStates <States> 和-list命令一起使用,基于应用状态来过滤应用。多个状态用逗号分隔。可用状态为:ALL, NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED -appTypes <Types> 使用-list命令,基于应用类型来过滤应用,如果应用类型有多个,用逗号分隔 -list 列出从RM返回的应用程序,使用-appTypes参数实现基于应用类型的过滤,使用-appStates参数使用基于应用状态的过滤 -kill <ApplicationId> kill指定的应用程序 -status <ApplicationId> 打印应用程序的状态 -updatePriority <Priority> 更新应用优先级,可以使用“appId”指定应用ID applicationattempt 打印application attempt的报告。 用法: 参数项详情: -help 帮助 -list <ApplicationId> 打印指定应用的application attempt列表 -status <Application Attempt Id> 打印application attempt的状态 classpath 打印获取Hadoop相关的jar以及其他必需的库所用的类路径 用法: container 打印container信息。 用法: 参数项: -help 帮助 -list <Application Attempt Id> 应用程序尝试的Containers列表 -status <ContainerId>
[阅读更多...] -
关于MapReduce6 – 重构WordCount
这里是一个更完整的WordCount实例。在这个实例中使用了很多前面提到的MapReduce框架的特性。 这个实例需要HDFS支持运行,尤其是关于DistributedCache的一些特性。因此,这个实例只能运行于伪分布式或者完全分布式安装的Hadoop上。 看下源代码: 看看输入文本文件: 查看文件的内容: 运行应用程序: 输出执行结果: 这次我们的输入内容和第一次有些不同,请注意输出结果是怎样受到影响的。 现在我们通过DistributedCache插入一个模式文件,在这个文件里,列出了一些可以被忽略统计的单词模式。 再次运行,这次我们需要在命令中添加更多的选项: 输出结果和我们期望的一样: 再运行一次,这次我们关闭大小写敏感的特性: 就这样,我们看看输出结果: 程序要点 通过使用一些Map/Reduce框架提供的功能,WordCount的第二个版本在原始版本基础上有了如下的改进: 展示了应用程序如何在Mapper (和Reducer)中通过setup方法修改配置参数; 展示了怎样使用DistributedCache分发作业需要的只读数据。在这个程序中,允许用户设定单词模式,并在统计中跳过符合模式的单词; 展示了使用GenericOptionsParser去处理常见hadoop命令行选项的功能; 展示了应用程序如何使用Counters,如何通过传递给map(和reduce) 方法的Reporter实例来设置应用程序的状态信息。 #######
[阅读更多...] -
关于MapReduce5 – 一些有用的特性
提交作业到队列 用户将作业提交到队列。队列是作业的集合,允许系统添加特定的功能,比如,队列通过ACL决定哪些用户可以提交作业。通常主要由Hadoop调度器使用队列。 Hadoop的配置信息使用了一个单独的托管队列,被称为“default”。队列的名称是由mapreduce.job.queuename定义的(Hadoop的site配置)。一些作业调度器支持多重队列,比如Capacity Scheduler。 作业设置要提交到的队列有两种方式:通过mapreduce.job.queuename属性或者通过Configuration.set(MRJobConfig.QUEUE_NAME, String) API。队列名称的设置是可选的。如果作业提交的时候没有指定队列名称,将会被提交到“default”队列。 Counters Counters是由MapReduce框架或应用程序定义的全局计数器。每个Counter都可以是任一种Enum类型。同一特定类型的Counter可以汇集到一个组,其类型为Counters.Group。 应用程序可以定义任意(Enum类型的)的Counters,并通过map或者reduce方法中的Counters.incrCounter(Enum, long)和Counters.incrCounter(String, String, long)进行更新。之后框架会汇总这些全局计数器。 DistributedCache DistributedCache可将具体应用相关的,Size大的、只读的文件有效地分布放置。 DistributedCache是MapReduce框架提供的功能,可以缓存应用程序所需的文件(文本、压缩文件、jar包等等)。 应用程序在Job中通过url(hdfs://)指定需要被缓存的文件。DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。 在slave节点上作业的所有任务执行之前,MapReduce框架会把一些必要的文档复制到相应节点。框架的这个过程运行效率是很高的,主要有两点原因:每个作业的文件只拷贝一次,可以缓存压缩文件并在slave节点上解压文件。 DistributedCache根据缓存文档修改的时间戳进行跟踪。在作业运行期间,当前应用程序或其他外部程序不能修改缓存文件。 DistributedCache可以用来分发简单的只读数据或是文本,也可以分发类型复杂的文件,比如压缩文件和jar包。压缩文件(zip, tar, tgz and tar.gz文件)会在在slave节点上解压。这些文件可以被设置操作权限。 用户可以通过mapreduce.job.cache.{files |archives}来分发文件或压缩包。如果要分发多个文件,可以使用逗号来分割文件路径。也可以使用API来设置这个属性,可以使用的API包括:Job.addCacheFile(URI)/ Job.addCacheArchive(URI) 和[Job.setCacheFiles(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)/ [Job.setCacheArchives(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)。这些方法中的URI形式是:hdfs://host:port/absolute-path\#link-name。在Streaming程序中,可以通过命令行选项-cacheFile/-cacheArchive来分发文件。 DistributedCache可以在map或reduce任务中作为一种基础的软件分发机制。他可以用来分发jar包和本地库。Job.addArchiveToClassPath(Path)或Job.addFileToClassPath(Path)这两个API可以用来缓存jar包或文件,并把它们添加到classpath和子jvm中去。同样的事情也可以通过设置配置属性mapreduce.job.classpath.{files |archives}做到。类似的,被链接到任务的工作目录的缓存文件可以用来分发和装载本地库。 私有的和公共的DistributedCache文件 DistributedCache文件可以是私有的也可以是公共的。这要看它们是如何在slave节点上被共享的。 私有的DistributedCache文件被缓存在用户私有的本地目录下,用户的作业需要使用这些缓存文件。这些文件只能被特定用户的作业或任务访问。其他用户无法访问这些文件。可以在DistributedCache文件被上传的文件系统上(通常是HDFS)将其权限设置为私有。如果文件或文件所在的目录无法被外界搜索到或访问到,这个文件就是私有的。 公共的DistributedCache文件缓存在一个全局共享目录下,并被设置为所有用户可见。这些文件可以被slave节点上的所有用户的作业和任务访问。同样,可以在文件系统将文件设置为公有。如果用户想把文件设置为公共的,那么文件的权限必须被设置为全局可读,并且文件所在的目录权限需要被设置为可执行。 Profiling Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map和reduce样例运行分析报告。 用户可以通过设定mapreduce.task.profile属性来决定是否收集作业任务的profiler信息。利用API Configuration.set(MRJobConfig.TASK_PROFILE, boolean)可以修改属性值。如果设置为true,则开启profiling功能。收集的profiler信息被保存在用户日志目录。默认profiling功能是关闭的。 如果用户设定使用profiling功能,可以使用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile的map/reduce task范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。 用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修改属性要使用api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String)。当运行task时,如果字符串包含%s。 它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling 参数如下: Debugging Map/Reduce框架能够运行用户提供的调试脚本。 当map/reduce任务失败时,用户可以通过运行调试脚本对任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)进行分析和处理。调试脚本的标准输出和错误信息会在作业UI的Diagnostic处展示。 在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。 如何分发脚本文件: 用户要用 DistributedCache 来分发和链接脚本文件 如何提交脚本: 一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置 “mapreduce.map.debug.script” 和 “mapreduce.reduce.debug.script” 属性的值。这些属性也可以通过 Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)和 Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String) API来设置。对于streaming, 可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。 脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行的调试命令: Pipes 程序根据第五个参数获得c++程序名。 因此调试pipes程序的命令是 默认行为: 对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。 数据压缩 Hadoop MapReduce框架为应用程序的写入文件操作提供了压缩工具,这些工具可以为map输出的中间数据和作业结果输出数据(例如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现,比如实现了 zlib压缩算法。 Hadoop同样支持gzip、bzip2 、snappy和 lz4等文件格式。 考虑到性能问题(zlib)以及Java类库的缺失等因素,Hadoop也为上述压缩解压算法提供本地库的实现。 中间输出 应用程序可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api控制map输出的中间结果,并且可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api指定 CompressionCodec。 结果输出 应用程序可以通过 api方法FileOutputFormat.setCompressOutput(Job, boolean) 控制输出是否需要压缩,并且可以使用 FileOutputFormat.setOutputCompressorClass(Job, Class)指定CompressionCodec。 如果作业输出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api,来设定 SequenceFile.CompressionType (i.e. RECORD
[阅读更多...] -
关于MapReduce4 – 作业的输入输出
作业的输入 InputFormat InputFormat描述了MapReduce作业的输入规范。 MapReduce框架根据MapReduce作业的InputFormat来做这些事情: 校验作业的输入配置; 把输入文件切分成多个逻辑上的InputSplit实例,并把每个实例分发给一个Mapper; 提供RecordReader实现类,从逻辑InputSplit中收集输入记录,并将这些记录交给Mapper处理。 基于文件的InputFormat的实现类(通常是FileInputFormat的子类)的默认行为是按照输入文件的字节(byte)大小,将输入切分成逻辑InputSplit实例。然而,输入文件的FileSystem的块的大小是InputSplit分块大小的上限。InputSplit分块大小的下限可以通过mapreduce.input.fileinputformat.split.min来设置。 考虑到边界情况,对于很多应用程序来说,按照输入文件大小进行切分是不能满足需求的。在这种情况下,应用程序需要实现一个RecordReader类来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。 TextInputFormat是默认的InputFormat。 如果一个作业的InputFormat是TextInputFormat,并且框架检测到输入文件的扩展名是.gz就会使用对应的CompressionCodec自动解压这些文件。但是需要注意,带上述扩展名的压缩文件不会被分割,并且整个压缩文件会分给一个mapper来处理。 InputSplit InputSplit是每个mapper要处理的数据块。 通常InputSplit是字节样式的输入,然后由RecordReader处理并转化成记录样式。 FileSplit是默认的InputSplit。它把mapreduce.map.input.file设置为输入文件的路径,输入文件是逻辑分块文件。 RecordReader RecordReader从InputSplit读入 <key, value>对。 通常RecordReader将InputSplit提供的字节样式的输入转化为由mapper处理的记录样式的文件。因此RecordReader负责处理记录的边界情况,以及把数据表示成<key, value>对形式。 作业的输出 OutputFormat OutputFormat描述了作业的输出样式。 MapReduce框架依赖OutputFormat来做这些事情: 检查作业的输出,例如检查输出路径是否已存在; 提供RecordWriter实现类以输出作业结果。输出文件保存在FileSystem上。 TextOutputFormat是默认的OutputFormat。 OutputCommitter OutputCommitter描述了MapReduce作业输出任务的提交。 MapReduce框架依赖OutputCommitter来做这些事情: 在初始化时对作业进行设置。比如在作业初始化的时候创建作业的临时输出目录。在作业初始化任务之后,作业的设置由另一个不同的的任务完成,此时作业的状态为PREP。当作业设置任务执行完成时,作业的状态将会改变为RUNNING。 在作业完成后执行清理工作。比如在作业执行完成后删除临时输出目录。作业清理工作在作业执行的末期由一个独立的任务完成。在作业清理任务完成后,作业的状态是:SUCCEDED/FAILED/KILLED。 设置任务临时输出。在任务初始化阶段,任务的设置是相同任务的一部分工作。 检查任务是否需要提交。这样做是为了避免任务在不需要提交时就被提交了。 提交任务的输出。在任务执行完成时,如果有需要,任务会提交它的输出。 放弃任务的提交。如果任务已经失败或者被杀死,输出内容将会被清理。如果任务不能自行完成清理工作(比如被异常阻塞)。另一个有相同尝试ID的任务将会被调用去完成清理工作。 FileOutputCommitter是默认的OutputCommitter。作业的设置和清理工作占用了Map或者Reduce的容器(哪个在NodeManager上可用就用哪个)。作业清理任务(JobCleanup)、任务清理任务(TaskCleanup)和作业设置任务(JobSetup)拥有最高的优先级,这三者之间的优先级同提到的顺序。 任务的Side-Effect Files 在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果文件不同。 在这种情况下,同一个Mapper或Reducer的两个实例(比如预防性任务)同时打开或者写入文件系统上的同一个文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(而不只是每次任务)选取一个独一无二的文件名(使用attemptid,比如attempt_200709221812_0001_m_000000_0)。 为了避免这些冲突,在OutputCommitter是FileOutputCommitter时,MapReduce框架为每次尝试任务都建立维护了一个特殊的子目录${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid},这个目录位于尝试任务输出结果存放的FileSystem上,可以通过${mapreduce.task.output.dir}来访问这个目录。对于成功完成的尝试任务,只有${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}下的文件会被移动到${mapreduce.output.fileoutputformat.outputdir}。当然了,框架会丢弃那些失败的尝试任务的子目录。对于应用程序来说,这个过程是完全透明的。 在任务执行期间,应用程序在写文件时可以利用这个特性,比如通过FileOutputFormat.getWorkOutputPath(Conext)在${mapreduce.task.output.dir}获得${mapreduce.task.output.dir}目录,并在其下创建任意需要的side-file。框架在任务尝试成功时会马上移动这些文件,因此不需要在程序中为每次任务尝试取一个独一无二的名字。 注意:在每次任务尝试执行期间,${mapreduce.task.output.dir}的值实际上是${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},这个值是由MapReduce框架设定的。所以使用这个特性的方法是在MapReduce的FileOutputFormat.getWorkOutputPath(Conext)方法返回的路径中创建side-file即可。 对于只使用map而不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。 RecordWriter RecordWriter将作业输出的kye-value对写入到输出文件。 RecordWriter的实现类将作业的输出写入到文件系统。 #############
[阅读更多...] -
关于MapReduce3 – 作业的配置和执行
配置 一个Job就表示了一个MapReduce的作业配置。 Job是用户向Hadoop框架描述一个MapReduce作业如何执行的最主要的接口。框架会尽力按Job的描述去忠实地执行一个作业,但是: 一些配置参数可能会被管理员标记为final(了解下final参数),这意味着不能被更改; 作业的一些参数可以被直接设置(比如Job.setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间存在微妙的联系,设置起来比较复杂(比如Configuration.set(JobContext.NUM_MAPS, int))。 Job通常是用来指定Mapper,Combiner(如果有的话),Partitioner,Reducer,InputFormat,OutputFormat的实现类。Job可以指定一组输入文件,使用的方法包括(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))。Job还可以指定结果输出文件的地址(FileOutputFormat.setOutputPath(Path))。 此外,Job还可以用来为作业做一些高级的配置。比如使用哪个Comparator,哪些文件可以被放进DistributedCache,中间结果或者作业的输出结果是否可以被压缩以及怎样压缩,作业是否允许预防性任务的执行(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)),每个任务的最大尝试次数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等等。 当然用户可以使用Configuration.set(String, String)/ Configuration.get(String)来set/get应用程序需要的任意参数。然而,有些参数需要慎重使用,比如DistributedCache更适用于大规模的只读数据。 执行 MRAppMaster是在一个单独的jvm上以子进程的形式来执行Mapper/Reducer的任务的。 子任务会继承父MRAppMaster的执行环境。当然,用户也可以自行设置子jvm上的附加选项。这需要使用mapreduce.{map|reduce}.java.opts和Job中的配置参数(比如使用-Djava.library.path=<>将一个非标准路径设为运行时的链接用以搜索共享库)。如果mapreduce.{map|reduce}.java.opts的参数包含“@taskid@”标识符,它会被替换成MapReduce中taskid的值。 下面是一个包含多个参数和替换的例子,其中包括:jvm GC日志;无密码启动JVM JMX代理程序以使之可以连接到jconsole或者类似的工具来查看子进程的内存、线程并得到线程的dump;分别设置map和reduce的子jvm的堆内存为512MB和1024MB;还添加了一个附加路径到子jvm的java.library.path中。 内存管理 用户或者管理员可以使用mapreduce.{map|reduce}.memory.mb设置子任务以及其子进程的虚拟内存的最大值。注意,这里设置的值是对每一个进程都有效的。mapreduce.{map|reduce}.memory.mb设置的值的单位是MB。还有,这个值必须大于或等于java虚拟机的-Xmx的值,否则虚拟机可能无法启动。 注意:mapreduce.{map|reduce}.java.opts只用于设置MRAppMaster启动的子任务。为守护进程设置内存的选项请参看文档《Hadoop守护进程环境配置》 框架一些部分的可用内存也是可配置的。在map和reduce任务中,程序的性能可能会被一些参数的调整所影响——比如是并发相关选项或者将数据写入硬盘的频率。监控文件系统关系到一个作业的计数器——尤其是关系到从map到reduce的byte数——是至关重要的。这些数据对于协调我们刚才提到的参数有着非常宝贵的价值。 Map参数 一个map所产生的某条记录会在被序列化后置入缓存,相关的元数据则会被存放入accounting buffer中。就像下面这些选项所描述的,若是缓存中序列化数据的大小或者元数据的大小超过了某个阈值,而此时map还在持续输出记录,缓存中的内容就会被排序并写入硬盘生成一个临时文件,这个过程可以称为“溢写(spill)”。如果缓存已被填满并且产生溢写,此时map线程就会被阻塞。在map任务执行完成后,缓存中现存的所有的记录都会被写入硬盘,而后硬盘上所有相关的临时文件会被合并成一个独立的文件。较小的溢写可以减少map执行的时间,但是一个较大的缓存又会占用Mapper的可用内存。 名称 类型 描述 mapreduce.task.io.sort.mb int 累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB mapreduce.map.sort.spill.percent float 排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘 名称 类型 描述 mapreduce.task.io.sort.mb int 累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB mapreduce.map.sort.spill.percent float 排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘 其他需要注意的内容: 如果超出了溢写阈值而还有一个溢写正在执行,那么将只做收集工作直到当前的溢写完成。举个例子:如果mapreduce.map.sort.spill.percent的值被设置为0.33,缓存的数据已经达到了阈值但此时仍有溢写在执行,那么下一次的溢写会一起处理这次收集的数据(即是缓存中0.66的内容),而不是产生额外的溢写。换句话说,这里定义的阈值是触发性的,不是阻塞性的。 如果一条记录的大小超过了排序缓存区的的容量,那么这将会先触发一个溢写,这条记录将会被溢写到一个单独的文件。而这条记录是否会被combiner优先处理则是不确定的。 Shuffle/Reduce参数 就像之前所描述的,每个reduce会取得Partitioner通过和HTTP分配给它的map的输出并且分阶段的将这些输出合并到硬盘上。如果map输出中间集的压缩设置被开启的话,那么map每一次的输出都会解压到内存。下面的这些选项影响到reduce处理前将输出合并到硬盘的频率以及在reduce处理时分配给map输出的内存大小。 名称 类型 描述 mapreduce.task.io.soft.factor int 指定在硬盘上每次合并的临时文件的数目。它限制了合并时打开文件的数目以及压缩编码方式。如果文件的数目超过了这个限制,合并的工作将会提交给几个不同的进程。虽然这个限制也适用于map,但是大部分作业最好是做下设置,这样这个限制将不可能达到。 mapreduce.reduce.merge.inmem.thresholds int 在被合并写入硬盘前,被抓取到内存中的排好序的map输出记录的总数。就像之前提到的溢写阈值,这个参数也不是被定义为一个切分单元,而是一个触发值。在实践中,这个值通常被设置的非常高(1000)或者被禁用(0),因为在内存中对片段的合并往往要比在硬盘上的合并开销低得多(查看下文的注意事项)。这个阈值只是影响了shuffle阶段内存中进行文件合并的频率。 mapreduce.reduce.shuffle.merge.percent float 在内存文件合并前放置到内存中的map输出的阈值,这个值表示了内存中用于存储map输出的百分比。因为map的输出若无法装入内存就会被停滞,这个值设置过高的话又会降低获取和合并的平行度。相反的,这个值达到1.0时对于那些输入可以完全放入内存的reduce是有效的。这个参数只是影响了shuffle阶段内存中进行文件合并的频率。 mapreduce.reduce.shuffle.input.buffer.percent float 在shuffle阶段分配给map输出缓存的堆内存百分比——相对于通常在mapreduce.reduce.java.opts中设置的堆内存的大小。尽管还需要留一些内存给框架,通常把这个值设置的足够大还是很有用的,这样可以存储以存储更大更多的map输出,减少溢写的内容。 mapreduce.reduce.input.buffer.percent float 在reduce阶段用于存储map输出的最大堆内存的百分比。在reduce开始时缓存中的map输出内容将会被合并到硬盘直到剩下的内容小于这个值。默认情况下,所有的map输出都会被合并到硬盘,以为reduce留出足够大的内存空间。对于内存需求比较小的reduce,应当适当的增加这个值,以避免写入硬盘从而提升性能。 其他需要注意的内容: 如果一条map输出大于用于复制map输出的内存的25%,它将会被直接写入到硬盘。 当使用combiner的时候,关于使用高合并阈值以及大缓存的说法就不太可靠了。因为合并发生在所有的map输出被获取前,而combiner是在溢写到硬盘的时候运行。在一些案例中,用户通过将资源用于对map输出的combin而不是简单增加缓存的大 小可以优化reduce的执行时间。使用combier有助于减小溢写,实现溢写和获取的并行处理。 在合并内存中的map输出到硬盘并启动reduce时,如果有碎片要溢写而硬盘上至少有mapreduce.task.io.sort.factor个临时文件,此时需要进行一次中间合并。仍在内存中的map输出也会成为中间合并的一部分。 配置参数 下面的属性是每个task执行时使用的本地作业参数: 名称 类型 描述 mapreduce.job.id String 作业ID mapreduce.job.jar String 在job目录中job.jar的位置 mapreduce.job.local.dir String job指定的共享存储空间 mapreduce.task.id String task id mapreduce.task.attempt.id String task尝试ID mapreduce.task.is.map boolean 是否是map task mapreduce.task.partition int task在job中使用的id mapreduce.map.input.file String map读取的文件名 mapreduce.map.input.start long map输入的数据块的起始位置偏移量 mapreduce.map.input.length long map输入的数据块的字节数 mapreduce.task.output.dir String
[阅读更多...] -
关于MapReduce2 – Job主体
这一部分内容会适当深入说明用户即将面对的MapReduce框架的各个环节。这有助于用户从一个更细的粒度地去实现、配置、调优作业。 我们先看看Mapper和Reducer接口。通常应用程序实现这两个接口需要提供map和reduce方法。 然后我们会讨论下其他的核心接口,包括Job,Partitioner,InputFormat,OutputFormat等接口。 最后,我们将通过讨论MapReduce框架一些有用的功能点比如DistributedCache,IsolationRunner等等来收尾。 Mapper Mapper将输入的key-alue对集合处理成另外一组key-value对集合。得到的新的key-value对集合只是一个用来过渡的中间记录集。 Map是一类将输入的记录处理为中间记录集的独立任务。生成的中间记录集不需要和输入记录类型一致。一组输入记录经过Map处理,可能会生成0个或多个输出key-value对。 Hadoop MapReduce框架为每个InputSplit创建一个map任务,而每个InputSplit是由作业的InputFormat产生的。 概括地说,作业通过Job.setMapperClass(Class)方法调用Mapper的实现类。然后MapReduce框架就可以在任务中调用map方法来处理InputSplit中每个的key-value对。应用程序可以通过重写cleanup方法来执行必要的清理工作。 输出的key-value集合不必要和输入的key-value集合类型一致。一个输入的key-value对经过处理可能输出0个或多个输出key-value对。通过调用context.write()方法可以收集输出的key-value对。 应用程序可以使用Counter(计数器)来汇报统计信息。 MapReduce框架随后会把一个特定的key所关联的所有中间记录进行分组,并提交给Reducer去处理生成最终结果。用户可以通过Job.setGroupingComparatorClass(Class)来指定一个Comparator来控制分组。 Mapper的输出结果排好序后就被分割开分配给每个Reducer去处理。分割的数目和作业中reduce的任务数目是一致的。用户可以通过实现自定义的Partitioner来控制哪个key(或之后的结果集)被分配给哪个Reducer。 用户也可以选择使用一个combiner来对产生的中间结果集进行一次本地的聚合操作(或者说是本地的reduce操作),这有助于减少Mapper传递给Reducer的数据量。这里需要使用到Job.setCombinerClass()方法。 这些排好序的中间过程的输出结果的保存格式是(key-len, key, value-len, value)。程序可以在Configuration中使用CompressionCodec来控制是否压缩以及怎样压缩输出的中间记录集。 需要多少个Map? map的数目通常是由输入记录的大小决定的。一般就是输入文件的总块(block)数。 map正常的并行规模大致是每个节点10-100个map。如果map任务对cpu的消耗较小的话,那每个节点可以设置多达300个map。由于每个任务的初始化需要占用一定的时间,所以比较合理的情况是每个map的执行时间最少1分钟。 因此,如果你有10TB的输入数据,每个块的规模是128MB,那么最终会需要82000个map来完成任务。除非使用了Configuration.set(MRJobConfig.NUM_MAPS, int)(这也只不过是给了MapReduce框架一个提示)来将这个数值设置的更高。 Reducer Reducer将一组有相同key的中间记录集处理为一组更小的结果集。 用户可以通过Job.setNumReduceTasks(int)来控制作业中reduce的数量。 概括地说,在作业中,Job对象可以用通过Job.setReducerClass(Class)来调用Reducer的实现类,并完成对其的初始化。然后MapReduce框架就可以调用reduce()方法来处理每个已分组好的输入键值对<key, (list of values)>。此时应用程序也可以重写cleanup(Context)来执行必要的清理。 Reducer有三个主要的阶段:shuffle,sort和reduce。 Shuffle Reducer的输入就是Mapper已经排好序的输出。在这个阶段,MapReduce框架通过HTTP为Reducer获取所有Mapper输出中与之相关的块。 Sort 在这个阶段中,框架会对Reducer的输入按key进行分组(因为不同mapper的输出中可能会有相同的key)。 Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取出一边被合并的。 Secondary Sort 如果需要中间记录集的分组规则和提交reduce前对key进行分组的规则不一致,那么可以通过Job.setSortComparatorClass(Class)指定一个Comparator。再加上Job.setGroupingComparatorClass(Class)可以被用来控制中间记录集如何分组,因此结合两者可以实现按值的二次排序。 Reduce 在这一阶段,框架为已分好组的每个<key, (list of values)>对调用reduce()方法。 reduce任务的输出通常会通过Context.write()写入文件系统。 应用程序可以通过Counter来汇报统计信息。 Reducer的输出是没有经过排序的。 需要多少个Reduce? reduce的适合数量大致等于 (0.95或1.75) *节点数*每个节点最大的Container数。 使用0.95的话,所有的reduce任务会在所有map任务执行完毕后立即执行,开始传输map的输出结果。使用1.75的话,较快的节点在完成第一轮reduce后就开始第二轮的reduce任务,这样可以达到一个比较好的负载均衡的效果。 增加reduce的数量会增加MapReduce框架的开销,但是可以改善负载均衡,降低因为执行失败带来的负面影响。 上述的比例系数比整体数目稍小一些是为了给框架中的推测任务(speculative-tasks)和失败的任务保留部分的reduce空间。 不使用Reducer 在没有reduce需求的时候,将reduce任务数目设置为0也是合理的。 在这种情况下,map任务的输出结果会被直接存入文件系统中。存放的路径FileOutputFormat.setOutputPath(Job, Path)指定。同样的,在存放入文件系统前,框架也没有对map的输出进行排序。 Partitioner Partitioner用以分隔键值空间。 Partitioner负责控制对map输出结果的键值进行分割。key(或者一个key的子集)用于产生分区,通常这个过程基于hash函数实现。分区的数目和作业中reduce任务的数目一致。因此它控制着将中间记录的key交给哪个reduce任务去处理。 默认的Partitioner是HashPartitioner。 Counter Counter是MapReduce用于汇报统计信息的一套机制。 Mapper和Reducer的实现类可以使用Counter来汇报一些统计信息。 Hadoop MapReduce框架自带了一套包含许多有用的Mapper、Reducer和Partitioner的类库。
[阅读更多...] -
关于MapReduce1 – QuickStart
概述 Hadoop Mapreduce是一个简单易用的框架。基于MapReduce写出来的程序能够运行在由上千台商用机器组成的大型集群上,以一种可靠的容错的方式并行处理T级别的海量数据。 一个MapReduce作业通常会把输入的数据集拆分成独立的块,由map任务(task)以完全并行的方式处理。框架先对map的输出结果进行排序,排好序的结果会做为reduce任务的输入。通常作业的输入和输出都存放在一个文件系统里。MapReduce框架负责任务的调度和监控,并重新执行已经失败的任务。 一般来说Hadoop的计算节点和存储节点是在相同的机器上。这是因为MapReduce和HDFS运行在一组相同的节点上。这种设计允许MapReduce在已经存储好数据的节点上高效地进行任务调度,从而更集中高效地使用整个集群的带宽。 MapReduce框架的组成包括:一个单独的master(ResourceManager),集群每个节点上都有一个slave(NodeManager),以及每个应用程序上的MRAppMaster(参考YARN架构)。 应用程序最少应该指定输入输出的位置,并通过实现合适的接口和(或)抽象类来提供map和reduce函数。有了这些设置,再加上其他作业参数,就组成了作业配置(job configuration)。 接下来,Hadoop的作业客户端(job client)就会提交作业(job, 一般是jar包或其他可执行程序)和配置信息给ResourceManager,ResourceManager负责分发软件和配置信息给slave,调度并监控任务执行,并提供状态和诊断信息给作业客户端(job-client)。 尽管Hadoop框架是使用Java实现的,但是MapReduce程序不一定要用Java来写,比如我们可以使用Hadoop Streaming和Hadoop Pipes。 Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序(例如:Shell)来做为mapper和reducer。 Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。 输入和输出 MapReduce框架基于<key, value>对执行,也就是说,MapReduce框架把作业的输入视为一组<key, value>对,并生产一组<key, value>对作为作业的输出。这两组<key, value>对可以是不同的类型。 MapReduce框架需要对key和value的实例进行序列化操作,因此这些类需要实现Writable接口。此外,key对应的类还要实现WritableComparable接口以适应MapReduce框架对排序的要求。 一个Mapreduce作业的输入输出类型如下: 注意combin的输出类型是要和map的输出类型一致的。 实例:WordCount v1.0 在深入细节之前,我们先看一个MapReduce的应用示例,以对MapReduce的工作方式有一个初步的认识。 WordCount 是一个简单的应用,可以用来统计在一组输入数据中某个单词出现的次数。 这个应用适用于单机模式,伪分布式模式或完全分布式模式三种Hadoop安装方式。 源码如下,我做了折叠,点开查看就好: 工程的maven的配置如下,也是做了折叠,点开查看就好: 执行“mvn clean package”完成打包。 执行如下命令运行MapReduce任务: 其中/adt/input是输入目录,/adt/output是输出目录,输入和输出目录都在HDFS上。输入目录下可以有多个输入文件。在/adt/input下有两个输入文件,可以看一下文件的内容: 看一下输出结果: 运行命令说明 应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递压缩文档做为参数,这些压缩文档会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。 使用-libjars, -files and -archives运行wordcount实例: 在这里,压缩文档myarchive.zip将被解压并存放在以“myarchive.zip”命名的目录下。 用户也可以使用#标识通过-files或者-archives为文件或压缩文档指定一个别名。 举个例子: 在这个例子中,通过别名dict1和dict2分别访问文件dir1/dict.txt和dir2/dict.txt。而压缩文档mytar.tgz将会被解压并置于名为“tgzdir”的目录下。 关于Map 程序中的map方法: Mapper的实现类通过map方法,一次处理一行由指定的FileInputFormat提供的记录。然后Mapper通过StringTokenizer将这一行记录以空格为分隔符分割成若干Tokens,而后输出格式为< <word>, 1>的key-value对。 对于提供的第一个输入样本,map的输出内容为: 第二个样本的map输出内容为: 在我们的WordCount程序里还指定了一个combiner。 每个map运行后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。 第一个map的输出: 第二个map的输出: 关于reduce reduce的代码: Reducer实现类的reduce方法只是将每个key(本例中就是单词)出现的次数求和。 因此这个作业的输出就是: 关于main方法 main方法指定了作业的几个方面,比如说输入输出路径(通过命令行提供),key和value的类型,输入输出的格式等信息。在作业中,调用了job.waitForCompletion函数提交作业并监控它的执行。 ############
[阅读更多...]