关于MapReduce3 – 作业的配置和执行

配置

一个Job就表示了一个MapReduce的作业配置。

Job是用户向Hadoop框架描述一个MapReduce作业如何执行的最主要的接口。框架会尽力按Job的描述去忠实地执行一个作业,但是:

  • 一些配置参数可能会被管理员标记为final(了解下final参数),这意味着不能被更改;
  • 作业的一些参数可以被直接设置(比如Job.setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间存在微妙的联系,设置起来比较复杂(比如Configuration.set(JobContext.NUM_MAPS, int))。

Job通常是用来指定Mapper,Combiner(如果有的话),Partitioner,Reducer,InputFormat,OutputFormat的实现类。Job可以指定一组输入文件,使用的方法包括(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))。Job还可以指定结果输出文件的地址(FileOutputFormat.setOutputPath(Path))。

此外,Job还可以用来为作业做一些高级的配置。比如使用哪个Comparator,哪些文件可以被放进DistributedCache,中间结果或者作业的输出结果是否可以被压缩以及怎样压缩,作业是否允许预防性任务的执行(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)),每个任务的最大尝试次数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等等。

当然用户可以使用Configuration.set(String, String)/ Configuration.get(String)来set/get应用程序需要的任意参数。然而,有些参数需要慎重使用,比如DistributedCache更适用于大规模的只读数据。

执行

MRAppMaster是在一个单独的jvm上以子进程的形式来执行Mapper/Reducer的任务的。

子任务会继承父MRAppMaster的执行环境。当然,用户也可以自行设置子jvm上的附加选项。这需要使用mapreduce.{map|reduce}.java.opts和Job中的配置参数(比如使用-Djava.library.path=<>将一个非标准路径设为运行时的链接用以搜索共享库)。如果mapreduce.{map|reduce}.java.opts的参数包含“@taskid@”标识符,它会被替换成MapReduce中taskid的值。

下面是一个包含多个参数和替换的例子,其中包括:jvm GC日志;无密码启动JVM JMX代理程序以使之可以连接到jconsole或者类似的工具来查看子进程的内存、线程并得到线程的dump;分别设置map和reduce的子jvm的堆内存为512MB和1024MB;还添加了一个附加路径到子jvm的java.library.path中。

 

内存管理

用户或者管理员可以使用mapreduce.{map|reduce}.memory.mb设置子任务以及其子进程的虚拟内存的最大值。注意,这里设置的值是对每一个进程都有效的。mapreduce.{map|reduce}.memory.mb设置的值的单位是MB。还有,这个值必须大于或等于java虚拟机的-Xmx的值,否则虚拟机可能无法启动。

注意:mapreduce.{map|reduce}.java.opts只用于设置MRAppMaster启动的子任务。为守护进程设置内存的选项请参看文档《Hadoop守护进程环境配置》

框架一些部分的可用内存也是可配置的。在map和reduce任务中,程序的性能可能会被一些参数的调整所影响——比如是并发相关选项或者将数据写入硬盘的频率。监控文件系统关系到一个作业的计数器——尤其是关系到从map到reduce的byte数——是至关重要的。这些数据对于协调我们刚才提到的参数有着非常宝贵的价值。

Map参数

一个map所产生的某条记录会在被序列化后置入缓存,相关的元数据则会被存放入accounting buffer中。就像下面这些选项所描述的,若是缓存中序列化数据的大小或者元数据的大小超过了某个阈值,而此时map还在持续输出记录,缓存中的内容就会被排序并写入硬盘生成一个临时文件,这个过程可以称为“溢写(spill)”。如果缓存已被填满并且产生溢写,此时map线程就会被阻塞。在map任务执行完成后,缓存中现存的所有的记录都会被写入硬盘,而后硬盘上所有相关的临时文件会被合并成一个独立的文件。较小的溢写可以减少map执行的时间,但是一个较大的缓存又会占用Mapper的可用内存。

名称 类型 描述 mapreduce.task.io.sort.mb int 累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB mapreduce.map.sort.spill.percent float 排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘

名称

类型

描述

mapreduce.task.io.sort.mb

int

累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB

mapreduce.map.sort.spill.percent

float

排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘

其他需要注意的内容:

  • 如果超出了溢写阈值而还有一个溢写正在执行,那么将只做收集工作直到当前的溢写完成。举个例子:如果mapreduce.map.sort.spill.percent的值被设置为0.33,缓存的数据已经达到了阈值但此时仍有溢写在执行,那么下一次的溢写会一起处理这次收集的数据(即是缓存中0.66的内容),而不是产生额外的溢写。换句话说,这里定义的阈值是触发性的,不是阻塞性的。
  • 如果一条记录的大小超过了排序缓存区的的容量,那么这将会先触发一个溢写,这条记录将会被溢写到一个单独的文件。而这条记录是否会被combiner优先处理则是不确定的。

Shuffle/Reduce参数

就像之前所描述的,每个reduce会取得Partitioner通过和HTTP分配给它的map的输出并且分阶段的将这些输出合并到硬盘上。如果map输出中间集的压缩设置被开启的话,那么map每一次的输出都会解压到内存。下面的这些选项影响到reduce处理前将输出合并到硬盘的频率以及在reduce处理时分配给map输出的内存大小。

名称

类型

描述

mapreduce.task.io.soft.factor

int

指定在硬盘上每次合并的临时文件的数目。它限制了合并时打开文件的数目以及压缩编码方式。如果文件的数目超过了这个限制,合并的工作将会提交给几个不同的进程。虽然这个限制也适用于map,但是大部分作业最好是做下设置,这样这个限制将不可能达到。

mapreduce.reduce.merge.inmem.thresholds

int

在被合并写入硬盘前,被抓取到内存中的排好序的map输出记录的总数。就像之前提到的溢写阈值,这个参数也不是被定义为一个切分单元,而是一个触发值。在实践中,这个值通常被设置的非常高(1000)或者被禁用(0),因为在内存中对片段的合并往往要比在硬盘上的合并开销低得多(查看下文的注意事项)。这个阈值只是影响了shuffle阶段内存中进行文件合并的频率。

mapreduce.reduce.shuffle.merge.percent

float

在内存文件合并前放置到内存中的map输出的阈值,这个值表示了内存中用于存储map输出的百分比。因为map的输出若无法装入内存就会被停滞,这个值设置过高的话又会降低获取和合并的平行度。相反的,这个值达到1.0时对于那些输入可以完全放入内存的reduce是有效的。这个参数只是影响了shuffle阶段内存中进行文件合并的频率。

mapreduce.reduce.shuffle.input.buffer.percent

float

在shuffle阶段分配给map输出缓存的堆内存百分比——相对于通常在mapreduce.reduce.java.opts中设置的堆内存的大小。尽管还需要留一些内存给框架,通常把这个值设置的足够大还是很有用的,这样可以存储以存储更大更多的map输出,减少溢写的内容。

mapreduce.reduce.input.buffer.percent

float

在reduce阶段用于存储map输出的最大堆内存的百分比。在reduce开始时缓存中的map输出内容将会被合并到硬盘直到剩下的内容小于这个值。默认情况下,所有的map输出都会被合并到硬盘,以为reduce留出足够大的内存空间。对于内存需求比较小的reduce,应当适当的增加这个值,以避免写入硬盘从而提升性能。

其他需要注意的内容:

如果一条map输出大于用于复制map输出的内存的25%,它将会被直接写入到硬盘。

当使用combiner的时候,关于使用高合并阈值以及大缓存的说法就不太可靠了。因为合并发生在所有的map输出被获取前,而combiner是在溢写到硬盘的时候运行。在一些案例中,用户通过将资源用于对map输出的combin而不是简单增加缓存的大 小可以优化reduce的执行时间。使用combier有助于减小溢写,实现溢写和获取的并行处理。

在合并内存中的map输出到硬盘并启动reduce时,如果有碎片要溢写而硬盘上至少有mapreduce.task.io.sort.factor个临时文件,此时需要进行一次中间合并。仍在内存中的map输出也会成为中间合并的一部分。

配置参数

下面的属性是每个task执行时使用的本地作业参数:

名称

类型

描述

mapreduce.job.id

String

作业ID

mapreduce.job.jar

String

在job目录中job.jar的位置

mapreduce.job.local.dir

String

job指定的共享存储空间

mapreduce.task.id

String

task id

mapreduce.task.attempt.id

String

task尝试ID

mapreduce.task.is.map

boolean

是否是map task

mapreduce.task.partition

int

task在job中使用的id

mapreduce.map.input.file

String

map读取的文件名

mapreduce.map.input.start

long

map输入的数据块的起始位置偏移量

mapreduce.map.input.length

long

map输入的数据块的字节数

mapreduce.task.output.dir

String

task的临时输出目录

注意:在使用streaming执行作业时,mapreduce的参数名称会被变形。参数名称中的点“.”会被下划线“_”替换。比如,mapreduce.job.id会被写为“mapreduce_job_id”,而mapreduce.job.jar会被写为“mapreduce_job_jar”。要获取一个streaming作业中的Mapper或Reducer的值需要使用下划线的参数名称。

任务日志

NodeManager会读取标准输出(stdout)和错误(stderr)流,以及任务的系统日志并记录到日志文件夹${HADOOP_LOG_DIR}/userlogs下

lib的分配

DistributedCache也可以用于map或reduce任务中用到的jar包和本地库的分配。子JVM通常会把它现在工作目录添加到java.library.path和LD_LIBRARY_PATH。因此缓存的库也可以被System.loadLibrary或System.load方法调用。要了解更多的关于怎样通过分布式缓存调用共享库的细节参看《Native Libraries》文档。

提交和监控

Job 是用户提交的作业与ResourceManager交互的主要接口。

Job提供了提交作业,追踪作业进程,查看组件任务报表和日志,获取MapReduce集群状态信息等功能。

提交

作业提交过程包括:

  • 检查作业输入输出配置;
  • 为作业计算InputSplit的值;
  • 如果需要的话,为作业的DistributedCache建立必要的统计信息;
  • 复制作业的jar包和配置文件到FileSystem上的MapReduce系统目录下;
  • 提交作业到ResourceManager并视需求监控其状态。

作业的历史文件也会记录到用户指定目录mapreduce.jobhistory.intermediate-done-dir和mapreduce.jobhistory.done-dir,默认是作业输出目录。

用户使用下面的命令可以查看指定目录下的历史日志摘要:

这条指令可以打印作业的细节,以及失败的和被杀死的任务的细节。要查看作业更多的细节,比如成功的任务、每个任务尝试的次数(task attempt)等可以使用下面这条命令:

通常,用户使用Job创建应用程序,配置作业属性,提交作业并监控其运行。

控制

有时候用一个单独的MapReduce作业不能完成复杂的任务,此时用户需要将多个MapReduce作业连接在一起。这很简单,因为作业通常是输出到分布式文件系统中,这个作业的输出反过来也可以作为下一个作业的输入。

然而,这也意味着,保证作业完成(成功或失败)的责任就直接落到了客户身上。在这种情况下,可以使用的作业控制选项有:

  • Job.submit():提交作业给集群,并立即返回;
  • Job.waitForCompletion(boolean):提交作业到集群,并等待其完成后再返回。

#######


发表评论

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据