关于MapReduce4 – 作业的输入输出

作业的输入

InputFormat

InputFormat描述了MapReduce作业的输入规范。

MapReduce框架根据MapReduce作业的InputFormat来做这些事情:

  • 校验作业的输入配置;
  • 把输入文件切分成多个逻辑上的InputSplit实例,并把每个实例分发给一个Mapper;
  • 提供RecordReader实现类,从逻辑InputSplit中收集输入记录,并将这些记录交给Mapper处理。

基于文件的InputFormat的实现类(通常是FileInputFormat的子类)的默认行为是按照输入文件的字节(byte)大小,将输入切分成逻辑InputSplit实例。然而,输入文件的FileSystem的块的大小是InputSplit分块大小的上限。InputSplit分块大小的下限可以通过mapreduce.input.fileinputformat.split.min来设置。

考虑到边界情况,对于很多应用程序来说,按照输入文件大小进行切分是不能满足需求的。在这种情况下,应用程序需要实现一个RecordReader类来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。

TextInputFormat是默认的InputFormat。

如果一个作业的InputFormat是TextInputFormat,并且框架检测到输入文件的扩展名是.gz就会使用对应的CompressionCodec自动解压这些文件。但是需要注意,带上述扩展名的压缩文件不会被分割,并且整个压缩文件会分给一个mapper来处理。

InputSplit

InputSplit是每个mapper要处理的数据块。

通常InputSplit是字节样式的输入,然后由RecordReader处理并转化成记录样式。

FileSplit是默认的InputSplit。它把mapreduce.map.input.file设置为输入文件的路径,输入文件是逻辑分块文件。

RecordReader

RecordReader从InputSplit读入 <key, value>对。

通常RecordReader将InputSplit提供的字节样式的输入转化为由mapper处理的记录样式的文件。因此RecordReader负责处理记录的边界情况,以及把数据表示成<key, value>对形式。

作业的输出

OutputFormat

OutputFormat描述了作业的输出样式。

MapReduce框架依赖OutputFormat来做这些事情:

  • 检查作业的输出,例如检查输出路径是否已存在;
  • 提供RecordWriter实现类以输出作业结果。输出文件保存在FileSystem上。

TextOutputFormat是默认的OutputFormat。

OutputCommitter

OutputCommitter描述了MapReduce作业输出任务的提交。

MapReduce框架依赖OutputCommitter来做这些事情:

在初始化时对作业进行设置。比如在作业初始化的时候创建作业的临时输出目录。在作业初始化任务之后,作业的设置由另一个不同的的任务完成,此时作业的状态为PREP。当作业设置任务执行完成时,作业的状态将会改变为RUNNING。

在作业完成后执行清理工作。比如在作业执行完成后删除临时输出目录。作业清理工作在作业执行的末期由一个独立的任务完成。在作业清理任务完成后,作业的状态是:SUCCEDED/FAILED/KILLED。

设置任务临时输出。在任务初始化阶段,任务的设置是相同任务的一部分工作。

检查任务是否需要提交。这样做是为了避免任务在不需要提交时就被提交了。

提交任务的输出。在任务执行完成时,如果有需要,任务会提交它的输出。

放弃任务的提交。如果任务已经失败或者被杀死,输出内容将会被清理。如果任务不能自行完成清理工作(比如被异常阻塞)。另一个有相同尝试ID的任务将会被调用去完成清理工作。

FileOutputCommitter是默认的OutputCommitter。作业的设置和清理工作占用了Map或者Reduce的容器(哪个在NodeManager上可用就用哪个)。作业清理任务(JobCleanup)、任务清理任务(TaskCleanup)和作业设置任务(JobSetup)拥有最高的优先级,这三者之间的优先级同提到的顺序。

任务的Side-Effect Files

在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果文件不同。

在这种情况下,同一个Mapper或Reducer的两个实例(比如预防性任务)同时打开或者写入文件系统上的同一个文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(而不只是每次任务)选取一个独一无二的文件名(使用attemptid,比如attempt_200709221812_0001_m_000000_0)。

为了避免这些冲突,在OutputCommitter是FileOutputCommitter时,MapReduce框架为每次尝试任务都建立维护了一个特殊的子目录${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid},这个目录位于尝试任务输出结果存放的FileSystem上,可以通过${mapreduce.task.output.dir}来访问这个目录。对于成功完成的尝试任务,只有${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}下的文件会被移动到${mapreduce.output.fileoutputformat.outputdir}。当然了,框架会丢弃那些失败的尝试任务的子目录。对于应用程序来说,这个过程是完全透明的。

在任务执行期间,应用程序在写文件时可以利用这个特性,比如通过FileOutputFormat.getWorkOutputPath(Conext)在${mapreduce.task.output.dir}获得${mapreduce.task.output.dir}目录,并在其下创建任意需要的side-file。框架在任务尝试成功时会马上移动这些文件,因此不需要在程序中为每次任务尝试取一个独一无二的名字。

注意:在每次任务尝试执行期间,${mapreduce.task.output.dir}的值实际上是${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},这个值是由MapReduce框架设定的。所以使用这个特性的方法是在MapReduce的FileOutputFormat.getWorkOutputPath(Conext)方法返回的路径中创建side-file即可。

对于只使用map而不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。

RecordWriter

RecordWriter将作业输出的kye-value对写入到输出文件。

RecordWriter的实现类将作业的输出写入到文件系统。

#############

发表评论

This site uses Akismet to reduce spam. Learn how your comment data is processed.