关于MapReduce1 – QuickStart

概述

Hadoop Mapreduce是一个简单易用的框架。基于MapReduce写出来的程序能够运行在由上千台商用机器组成的大型集群上,以一种可靠的容错的方式并行处理T级别的海量数据。

一个MapReduce作业通常会把输入的数据集拆分成独立的块,由map任务(task)以完全并行的方式处理。框架先对map的输出结果进行排序,排好序的结果会做为reduce任务的输入。通常作业的输入和输出都存放在一个文件系统里。MapReduce框架负责任务的调度和监控,并重新执行已经失败的任务。

一般来说Hadoop的计算节点和存储节点是在相同的机器上。这是因为MapReduce和HDFS运行在一组相同的节点上。这种设计允许MapReduce在已经存储好数据的节点上高效地进行任务调度,从而更集中高效地使用整个集群的带宽。

MapReduce框架的组成包括:一个单独的master(ResourceManager),集群每个节点上都有一个slave(NodeManager),以及每个应用程序上的MRAppMaster(参考YARN架构)。

应用程序最少应该指定输入输出的位置,并通过实现合适的接口和(或)抽象类来提供map和reduce函数。有了这些设置,再加上其他作业参数,就组成了作业配置(job configuration)。

接下来,Hadoop的作业客户端(job client)就会提交作业(job, 一般是jar包或其他可执行程序)和配置信息给ResourceManager,ResourceManager负责分发软件和配置信息给slave,调度并监控任务执行,并提供状态和诊断信息给作业客户端(job-client)。

尽管Hadoop框架是使用Java实现的,但是MapReduce程序不一定要用Java来写,比如我们可以使用Hadoop Streaming和Hadoop Pipes。

Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序(例如:Shell)来做为mapper和reducer。

Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。

输入和输出

MapReduce框架基于<key, value>对执行,也就是说,MapReduce框架把作业的输入视为一组<key, value>对,并生产一组<key, value>对作为作业的输出。这两组<key, value>对可以是不同的类型。

MapReduce框架需要对key和value的实例进行序列化操作,因此这些类需要实现Writable接口。此外,key对应的类还要实现WritableComparable接口以适应MapReduce框架对排序的要求。

一个Mapreduce作业的输入输出类型如下:

注意combin的输出类型是要和map的输出类型一致的。

实例:WordCount v1.0

在深入细节之前,我们先看一个MapReduce的应用示例,以对MapReduce的工作方式有一个初步的认识。

WordCount 是一个简单的应用,可以用来统计在一组输入数据中某个单词出现的次数。

这个应用适用于单机模式,伪分布式模式或完全分布式模式三种Hadoop安装方式。

源码如下,我做了折叠,点开查看就好:

工程的maven的配置如下,也是做了折叠,点开查看就好:

执行“mvn clean package”完成打包。

执行如下命令运行MapReduce任务:

其中/adt/input是输入目录,/adt/output是输出目录,输入和输出目录都在HDFS上。输入目录下可以有多个输入文件。在/adt/input下有两个输入文件,可以看一下文件的内容:

看一下输出结果:

运行命令说明

应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递压缩文档做为参数,这些压缩文档会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。

使用-libjars, -files and -archives运行wordcount实例:

在这里,压缩文档myarchive.zip将被解压并存放在以“myarchive.zip”命名的目录下。

用户也可以使用#标识通过-files或者-archives为文件或压缩文档指定一个别名。

举个例子:

在这个例子中,通过别名dict1和dict2分别访问文件dir1/dict.txt和dir2/dict.txt。而压缩文档mytar.tgz将会被解压并置于名为“tgzdir”的目录下。

关于Map

程序中的map方法:

Mapper的实现类通过map方法,一次处理一行由指定的FileInputFormat提供的记录。然后Mapper通过StringTokenizer将这一行记录以空格为分隔符分割成若干Tokens,而后输出格式为< <word>, 1>的key-value对。

对于提供的第一个输入样本,map的输出内容为:

第二个样本的map输出内容为:

在我们的WordCount程序里还指定了一个combiner。

每个map运行后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。

第一个map的输出:

第二个map的输出:

关于reduce

reduce的代码:

Reducer实现类的reduce方法只是将每个key(本例中就是单词)出现的次数求和。

因此这个作业的输出就是:

关于main方法

main方法指定了作业的几个方面,比如说输入输出路径(通过命令行提供),key和value的类型,输入输出的格式等信息。在作业中,调用了job.waitForCompletion函数提交作业并监控它的执行。

############

发表评论

This site uses Akismet to reduce spam. Learn how your comment data is processed.