Hadoop MapReduce是一个软件框架,可以轻松地编写应用程序,以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)。
MapReduce 作业通常将输入数据集拆分为独立的块,这些任务由地图任务以完全并行的方式进行处理。框架对地图的输出进行排序,然后将其输入到reduce任务。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监视任务并重新执行失败的任务。
通常,计算节点和存储节点是相同的,也就是说,MapReduce框架和Hadoop分布式文件系统(请参阅HDFS体系结构指南)在同一组节点上运行。此配置使框架可以在已经存在数据的节点上有效地调度任务,从而在整个群集中产生很高的聚合带宽。
MapReduce框架由一个主资源管理器,一个群集节点一个工作器NodeManager和每个应用程序MRAppMaster组成(请参阅YARN体系结构指南)。
最少地,应用程序通过适当的接口和/或抽象类的实现来指定输入/输出位置和供应图,并减少功能。这些以及其他作业参数构成作业配置。
然后,Hadoop 作业客户端将作业(jar /可执行文件等)和配置提交给ResourceManager,然后由ResourceManager负责将软件/配置分发给工作人员,安排任务并对其进行监视,为工作提供状态和诊断信息,客户。
尽管Hadoop框架是用Java™实现的,但MapReduce应用程序不必用Java编写。
Hadoop Streaming是一个实用程序,它允许用户使用任何可执行程序(例如Shell实用程序)作为映射器和/或reducer创建和运行作业。
Hadoop Pipes是SWIG兼容的C ++ API,用于实现MapReduce应用程序(不基于JNI™)。
MapReduce框架仅在<key,value>对上操作,也就是说,该框架将作业的输入视为一组<key,value>对,并生成一组<key,value>对作为其输出。工作,可能是不同类型的。
该键和值类必须由框架序列化,因此需要实现可写接口。此外,关键类必须实现WritableComparable接口,以利于框架进行排序。
MapReduce作业的输入和输出类型:
(输入)<K1,V1> - > 映射 - > <K2,V2> - > 结合 - > <K2,V2> - > 减少 - > <K3,V3>(输出)
在进入细节之前,让我们来看一个MapReduce应用程序示例,以了解其工作方式。
WordCount是一个简单的应用程序,可以计算给定输入集中每个单词的出现次数。
这适用于本地独立的,伪分布式或全分布式Hadoop安装(“ 单节点安装”)。
导入java.io.IOException; 导入java.util.StringTokenizer; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; 导入org.apache.hadoop.io.IntWritable; 导入org.apache.hadoop.io.Text; 导入org.apache.hadoop.mapreduce.Job; 导入org.apache.hadoop.mapreduce.Mapper; 导入org.apache.hadoop.mapreduce.Reducer; 导入org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 导入org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 公共类WordCount { 公共静态类TokenizerMapper 扩展Mapper <Object,Text,Text,IntWritable> { 私有最终静态IntWritable一个= new IntWritable(1); 私人文字= new Text(); 公共无效图(对象键,文本值,上下文上下文 )引发IOException,InterruptedException { StringTokenizer itr =新的StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,一个); } } } 公共静态类IntSumReducer 扩展Reducer <Text,IntWritable,Text,IntWritable> { 私有IntWritable结果= new IntWritable(); public void reduce(文本键,Iterable <IntWritable>值, 上下文上下文 )引发IOException,InterruptedException { int sum = 0; for(IntWritable val:values){ sum + = val.get(); } result.set(sum); context.write(key,result); } } 公共静态void main(String [] args)引发异常{ 配置conf = new Configuration(); Job job = Job.getInstance(conf,“ word count”); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args [0])); FileOutputFormat.setOutputPath(job,new Path(args [1])); System.exit(job.waitForCompletion(true)?0:1); } }
假设环境变量设置如下:
导出JAVA_HOME = / usr / java / default 导出PATH = $ {JAVA_HOME} / bin:$ {PATH} 导出HADOOP_CLASSPATH = $ {JAVA_HOME} /lib/tools.jar
编译WordCount.java并创建一个jar:
$ bin / hadoop com.sun.tools.javac.Main WordCount.java $ jar cf wc.jar WordCount * .class
假如说:
样本文本文件作为输入:
$ bin / hadoop fs -ls / user / joe / wordcount / input / / user / joe / wordcount / input / file01 / user / joe / wordcount / input / file02 $ bin / hadoop fs -cat / user / joe / wordcount / input / file01 你好世界再见世界 $ bin / hadoop fs -cat / user / joe / wordcount / input / file02 您好Hadoop再见Hadoop
运行应用程序:
$ bin / hadoop jar wc.jar WordCount / user / joe / wordcount / input / user / joe / wordcount / output
输出:
$ bin / hadoop fs -cat / user / joe / wordcount / output / part-r-00000 再见1 再见1 Hadoop 2 你好2 世界2
应用程序可以使用-files选项指定以逗号分隔的路径列表,这些路径将出现在任务的当前工作目录中。该-libjars选项允许应用程序jar添加到地图的类路径和减少。选项-archives允许他们将逗号分隔的存档列表作为参数传递。这些归档文件是未归档的,并且在当前任务工作目录中创建了带有归档文件名称的链接。有关命令行选项的更多详细信息,请参见《命令指南》。
运行单词计数与例如-libjars,-files和-archives:
bin / hadoop jar hadoop-mapreduce-examples- <ver> .jar wordcount-文件cachefile.txt -libjars mylib.jar -archives myarchive.zip输入输出
在这里,myarchive.zip将被放置并解压缩到名为“ myarchive.zip”的目录中。
用户可以对文件和穿过档案指定不同的符号名-files和-archives选项,使用#。
例如,
bin / hadoop jar hadoop-mapreduce-examples- <ver> .jar wordcount -files dir1 / dict.txt#dict1,dir2 / dict.txt#dict2-归档mytar.tgz#tgzdir输入输出
在这里,任务分别使用符号名称dict1和dict2可以访问文件dir1 / dict.txt和dir2 / dict.txt。归档文件mytar.tgz将被放置并取消归档,其名称为“ tgzdir”。
应用程序可以通过在命令行上使用-Dmapreduce.map.env,-Dmapreduce.reduce.env和-Dyarn.app.mapreduce.am.env选项在命令行上指定映射器,化简器和应用程序主任务的环境变量,分别。
例如,以下为映射器和化简器设置环境变量FOO_VAR = bar和LIST_VAR = a,b,c,
bin / hadoop jar hadoop-mapreduce-examples- <ver> .jar字数-Dmapreduce.map.env.FOO_VAR = bar -Dmapreduce.map.env.LIST_VAR = a,b,c -Dmapreduce.reduce.env.FOO_VAR = bar -Dmapreduce.reduce.env.LIST_VAR = a,b,c输入输出
该字计数的应用是相当直接的。
公共无效图(对象键,文本值,上下文上下文 )引发IOException,InterruptedException { StringTokenizer itr =新的StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,一个); } }
所述映射器执行,通过地图的方法,一次处理一条线,由指定的提供的TextInputFormat。然后,它通过StringTokenizer将行拆分为用空格分隔的标记,并发出键值对<<word>,1>。
对于给定的样本输入,第一个图发出:
<你好,1> <世界,1> <再见1> <世界,1>
第二张地图发出:
<你好,1> <Hadoop,1> <再见1 <Hadoop,1>
在本教程的稍后部分,我们将详细了解为给定任务生成的地图数量,以及如何以细粒度方式控制它们。
job.setCombinerClass(IntSumReducer.class);
WordCount还指定一个组合器。因此,在对键 s 进行排序之后,每个映射的输出都将通过本地组合器(根据作业配置与Reducer相同)进行本地聚合。
第一张地图的输出:
<再见1> <你好,1> <世界,2>
第二张地图的输出:
<再见1 <Hadoop,2> <你好,1>
public void reduce(文本键,Iterable <IntWritable>值, 上下文上下文 )引发IOException,InterruptedException { int sum = 0; for(IntWritable val:values){ sum + = val.get(); } result.set(sum); context.write(key,result); }
该减速器执行,通过减少方法只是总结了值,它是对于每个键的出现计数(在这个例子即字)。
因此,作业的输出为:
<再见1> <再见1 <Hadoop,2> <你好,2> <世界,2>
所述主要方法指定作业的各个方面,诸如输入/输出路径(通过命令行通过),键/值的类型,输入/输出格式等,在作业。然后,它调用job.waitForCompletion提交作业并监视其进度。
在本教程的稍后部分,我们将学习有关Job,InputFormat,OutputFormat和其他接口和类的更多信息。
本节提供有关MapReduce框架每个面向用户方面的合理数量的详细信息。这应该可以帮助用户以细粒度的方式实现,配置和调整作业。但是,请注意,每个类/接口的javadoc仍然是最全面的文档。这仅是一个教程。
让我们首先使用Mapper和Reducer接口。应用程序通常实现它们以提供映射和归约方法。
然后,我们将讨论其他核心接口,包括Job,Partitioner,InputFormat,OutputFormat等。
最后,我们将讨论框架的一些有用功能,例如DistributedCache,IsolationRunner等,作为总结。
应用程序通常实现Mapper和Reducer接口以提供映射和reduce方法。这些构成了工作的核心。
映射器将输入键/值对映射到一组中间键/值对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不必与输入记录具有相同的类型。给定的输入对可以映射为零或许多输出对。
Hadoop MapReduce框架为作业的InputFormat生成的每个InputSplit生成一个映射任务。
总体而言,映射器实现是通过Job.setMapperClass(Class)方法传递给作业的。然后,框架针对该任务的InputSplit中的每个键/值对调用map(WritableComparable,Writable,Context)。然后,应用程序可以覆盖cleanup(Context)方法以执行任何必需的清理。
输出对不必与输入对具有相同的类型。给定的输入对可以映射为零或许多输出对。通过对context.write(WritableComparable,Writable)的调用来收集输出对。
应用程序可以使用计数器报告其统计信息。
随后,与给定输出键关联的所有中间值都由框架进行分组,并传递给Reducer,以确定最终输出。用户可以通过Job.setGroupingComparatorClass(Class)指定一个Comparator来控制分组。
对Mapper的输出进行排序,然后按Reducer进行分区。分区总数与作业的缩减任务总数相同。用户可以通过实现自定义Partitioner来控制将哪些键(并由此记录)转到哪个Reducer。
用户可以选择通过Job.setCombinerClass(Class)指定一个组合器,以执行中间输出的本地聚合,这有助于减少从Mapper传递给Reducer的数据量。
排序的中间输出始终以简单的格式(key-len,key,value-len,value)存储。应用程序可以通过Configuration控制是否以及如何压缩中间输出以及使用CompressionCodec。
Reducer减少一组中间值,这些中间值共享一个较小值集的密钥。
用户通过Job.setNumReduceTasks(int)设置作业的减少数量。
总体而言,Reducer实现是通过Job.setReducerClass(Class)方法传递作业的Job的,并且可以重写它来初始化自己。然后,框架为分组输入中的每个<key,(值列表)>对调用reduce(WritableComparable,Iterable <Writable>,Context)方法。然后,应用程序可以覆盖cleanup(Context)方法以执行任何必需的清理。
减速器分为三个主要阶段:随机播放,排序和缩小。
如果在归约之前要求用于分组中间密钥的等效规则与用于分组密钥的等效规则不同,则可以通过Job.setSortComparatorClass(Class)指定一个Comparator。由于Job.setGroupingComparatorClass(Class)可用于控制中间键的分组方式,因此可以结合使用这些键来模拟对值的二次排序。
在此阶段,将对分组输入中的每个<key,(值列表)>对调用reduce(WritableComparable,Iterable <Writable>,Context)方法。
的减少任务的输出典型地写入到文件系统经由Context.write(WritableComparable,可写)。
应用程序可以使用计数器报告其统计信息。
在输出减速是不排序。
减少的正确数量似乎是0.95或1.75乘以(<< 节点数 > * < 每个节点的最大容器数 >)。
使用0.95时,所有reduce都可以立即启动,并在地图完成时开始传输地图输出。随着1.75的快节点将完成其第一轮的减少和发射的减少做负载均衡的一个更好的工作第二波。
增加减少的数量会增加框架开销,但会增加负载平衡并降低故障成本。
上面的缩放因子略小于整数,以在框架中为推测性任务和失败任务保留一些减少的时间。
如果不需要减少,则将减少任务的数量设置为零是合法的。
在这种情况下,映射任务的输出直接进入FileSystem,进入FileOutputFormat.setOutputPath(Job,Path)设置的输出路径。框架不会在将映射输出写到FileSystem之前对其进行排序。
分区分区密钥空间。
分区程序控制中间映射输出的键的分区。密钥(或密钥的子集)通常用于通过散列函数来得出分区。分区总数与作业的缩减任务总数相同。因此,这控制了将中间键(以及记录)发送到m个还原任务中的哪个还原任务以进行还原。
HashPartitioner是默认的Partitioner。
作业代表MapReduce作业配置。
Job是用户向Hadoop框架描述MapReduce作业以执行的主要界面。该框架尝试如Job所述忠实地执行作业,但是:
虽然一些作业参数可以直接设置(例如Job.setNumReduceTasks(int)),但其他参数与框架的其余部分和/或作业配置巧妙地交互,并且设置起来更复杂(例如Configuration.set(JobContext.NUM_MAPS,int))。
Job通常用于指定Mapper,Combiner(如果有),Partitioner,Reducer,InputFormat和OutputFormat实现。FileInputFormat指示输入文件集(FileInputFormat.setInputPaths(Job,Path…) / FileInputFormat.addInputPath(Job,Path))和(FileInputFormat.setInputPaths(Job,String…) / FileInputFormat.addInputPaths(Job,String))以及其中的位置输出文件应被写入(FileOutputFormat.setOutputPath(Path))。
可选地,Job用于指定作业的其他高级方面,例如要使用的Comparator,要放置在DistributedCache中的文件,是否要压缩中间和/或作业输出(以及如何压缩),是否可以执行作业任务以推测方式执行(setMapSpeculativeExecution(boolean) / setReduceSpeculativeExecution(boolean)),每个任务的最大尝试次数(setMaxMapAttempts(int) / setMaxReduceAttempts(int))等。
当然,用户可以使用Configuration.set(String,String) / Configuration.get(String)设置/获取应用程序所需的任意参数。但是,对大量(只读)数据使用DistributedCache。
该MRAppMaster执行映射 / 减速 任务作为一个单独的JVM子进程。
子任务继承了父MRAppMaster的环境。用户可以指定附加的选项,以经由子-JVM 的MapReduce {地图|降低}。.java.opts在和配置参数工作如用于运行时链接经由搜索共享库非标准路径-Djava。 library.path = <>等。如果mapreduce。{map | reduce} .java.opts参数包含符号@ taskid @,则会使用MapReduce任务的taskid值进行内插。
这是一个包含多个参数和替换项的示例,显示了jvm GC日志记录以及启动了无密码JVM JMX代理,以便它可以与jconsole等连接以监视子内存,线程并获取线程转储。它还设置了映射的最大堆大小,并将子jvm分别减小到512MB和1024MB。它还向child-jvm 的java.library.path添加了一条附加路径。
<属性> <name> mapreduce.map.java.opts </ name> <值> -Xmx512M -Djava.library.path = / home / mycompany / lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false </ value> </ property> <属性> <name> mapreduce.reduce.java.opts </ name> <值> -Xmx1024M -Djava.library.path = / home / mycompany / lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false </ value> </ property>
用户/管理员还可以使用mapreduce。{map | reduce} .memory.mb来指定已启动子任务的最大虚拟内存,以及该子任务以递归方式启动的任何子进程。请注意,此处设置的值是每个进程的限制。为值映射缩减{地图|降低}。.memory.mb应当在兆字节(MB)指定。并且该值必须大于或等于传递给JavaVM的-Xmx,否则VM可能无法启动。
注意:mapreduce。{map | reduce} .java.opts仅用于配置从MRAppMaster启动的子任务。配置Hadoop守护程序的环境中介绍了配置守护程序的内存选项。
框架某些部分可用的内存也是可配置的。在映射和归约任务中,性能可能会受到调整影响操作并发性和数据撞击磁盘频率的参数的影响。监视文件系统计数器的工作-特别是相对于映射中的字节数并减少到减少-对于调整这些参数而言是无价的。
从映射发出的记录将被序列化到缓冲区中,而元数据将被存储到记帐缓冲区中。如以下选项中所述,当序列化缓冲区或元数据超过阈值时,缓冲区的内容将被排序并在后台写入磁盘,而映射继续输出记录。如果在溢出过程中任一个缓冲区已完全填满,则映射线程将阻塞。映射完成后,所有剩余的记录都将写入磁盘,并且所有磁盘上的段都将合并到一个文件中。减少溢出到磁盘的数量可以减少映射时间,但是较大的缓冲区也会减少映射器可用的内存。
名称 | 类型 | 描述 |
---|---|---|
mapreduce.task.io.sort.mb | 整型 | 存储从映射发出的记录的序列化和记帐缓冲区的累积大小(以兆字节为单位)。 |
mapreduce.map.sort.spill.percent | 浮动 | 序列化缓冲区中的软限制。一旦到达,线程将开始将内容溢出到后台的磁盘中。 |
其他注意事项
如果在进行泄漏的过程中超过了任一泄漏阈值,收集将继续进行直到泄漏完成为止。例如,如果将mapreduce.map.sort.spill.percent设置为0.33,并且在溢出运行时填充了缓冲区的其余部分,则下一个溢出将包括所有收集的记录或缓冲区的0.66,并且不会产生额外的泄漏。换句话说,阈值是定义触发器,而不是阻塞。
大于序列化缓冲区的记录将首先触发溢出,然后溢出到单独的文件中。尚不确定此记录是否首先通过组合器。
如前所述,每个reduce都会将分区程序通过HTTP分配给它的输出提取到内存中,并定期将这些输出合并到磁盘上。如果打开了地图输出的中间压缩,则将每个输出解压缩到内存中。以下选项会影响在还原之前这些合并到磁盘的频率以及在还原期间分配给映射输出的内存。
名称 | 类型 | 描述 |
---|---|---|
mapreduce.task.io.soft.factor | 整型 | 指定磁盘上要同时合并的段数。它限制了合并期间打开的文件和压缩编解码器的数量。如果文件数超过此限制,则合并将分几次进行。尽管此限制也适用于地图,但应配置大多数作业,以使在那里不太可能达到此限制。 |
mapreduce.reduce.merge.inmem.thresholds | 整型 | 合并到磁盘之前已提取到内存中的已排序映射输出的数量。像前面的注释中的溢出阈值一样,这不是定义分区的单位,而是触发器。实际上,通常将其设置为很高(1000)或禁用(0),因为合并内存段通常比从磁盘合并便宜(请参见此表后面的注释)。此阈值仅影响随机播放期间内存中合并的频率。 |
mapreduce.reduce.shuffle.merge.percent | 浮动 | 在内存中合并开始之前获取的映射输出的内存阈值,表示为分配用于在内存中存储映射输出的内存百分比。由于无法容纳在内存中的映射输出可能会暂停,因此将其设置为高值可能会降低获取和合并之间的并行度。相反,高达1.0的值可有效减少其输入完全适合内存的情况。此参数仅影响随机播放期间内存中合并的频率。 |
mapreduce.reduce.shuffle.input.buffer.percent | 浮动 | 相对于mapreduce.reduce.java.opts中通常指定的最大堆大小的内存百分比,可以在随机播放期间分配给存储映射输出。尽管应该为框架留出一些内存,但是通常最好将其设置得足够高以存储大量的大量地图输出。 |
mapreduce.reduce.input.buffer.percent | 浮动 | 相对于最大堆大小的内存百分比,在减少过程中可以保留映射输出。当减少开始时,映射输出将合并到磁盘,直到剩余的输出低于此定义的资源限制。默认情况下,在reduce开始最大化可用于reduce的内存之前,所有映射输出都将合并到磁盘。对于较少的内存密集型减少,应该增加它以避免跳到磁盘。 |
其他注意事项
如果映射输出大于分配给复制映射输出的内存的25%,则将其直接写入磁盘,而无需先经过内存。
当使用组合器运行时,关于高合并阈值和大缓冲区的推理可能不成立。对于在获取所有映射输出之前开始的合并,合并器将在溢出到磁盘的同时运行。在某些情况下,可以通过花费大量资源来组合地图输出(使磁盘溢出量较小,并使溢出和获取并行化),而不是大幅度增加缓冲区大小,从而缩短时间。
当将内存中的映射输出合并到磁盘以开始精简时,如果因为有要溢出的段并且至少已经在磁盘上存在mapreduce.task.io.sort.factor分段而需要进行中间合并,则内存中的映射输出将成为中间合并的一部分。
以下属性在作业配置中本地化,以执行每个任务:
名称 | 类型 | 描述 |
---|---|---|
mapreduce.job.id | 串 | 工作编号 |
mapreduce.job.jar | 串 | 作业目录中的job.jar位置 |
mapreduce.job.local.dir | 串 | 作业特定的共享暂存空间 |
mapreduce.task.id | 串 | 任务ID |
mapreduce.task.attempt.id | 串 | 任务尝试ID |
mapreduce.task.is.map | 布尔值 | 这是地图任务吗 |
mapreduce.task.partition | 整型 | 作业中任务的ID |
mapreduce.map.input.file | 串 | 地图正在读取的文件名 |
mapreduce.map.input.start | 长 | 地图输入分割开始的偏移量 |
mapreduce.map.input.length | 长 | 映射输入拆分中的字节数 |
mapreduce.task.output.dir | 串 | 任务的临时输出目录 |
注意:在执行流作业期间,将转换“ mapreduce”参数的名称。点(。)变成下划线(_)。例如,mapreduce.job.id变为mapreduce_job_id,而mapreduce.job.jar变为mapreduce_job_jar。要在流作业的映射器/缩减器中获取值,请在参数名称下加上下划线。
该DistributedCache也可用于使用分配两个罐子和机库中的地图和/或减少任务。child-jvm始终将其当前工作目录添加到java.library.path和LD_LIBRARY_PATH中。因此,可以通过System.loadLibrary或System.load加载缓存的库。Native Libraries中记录了有关如何通过分布式缓存加载共享库的更多详细信息。
Job是用户作业与ResourceManager交互的主要界面。
Job提供了提交作业,跟踪其进度,访问组件任务的报告和日志,获取MapReduce集群的状态信息等工具。
作业提交过程涉及:
检查作业的输入和输出规格。
计算作业的InputSplit值。
如有必要,为作业的DistributedCache设置必要的记帐信息。
将作业的jar和配置复制到FileSystem上的MapReduce系统目录。
将作业提交到ResourceManager并可以选择监视其状态。
作业历史记录文件也记录到用户指定的目录mapreduce.jobhistory.intermediate-done-dir和mapreduce.jobhistory.done-dir,该目录默认为作业输出目录。
用户可以使用以下命令查看指定目录中的历史日志摘要:$ mapred job -history output.jhist此命令将打印作业详细信息,失败和终止的提示详细信息。可以使用以下命令查看有关作业的更多详细信息,例如成功的任务和为每个任务进行的任务尝试,如下所示:$ mapred job -history all output.jhist
通常,用户使用Job来创建应用程序,描述作业的各个方面,提交作业并监视其进度。
用户可能需要链接MapReduce作业以完成无法通过单个MapReduce作业完成的复杂任务。这是相当容易的,因为作业的输出通常转到分布式文件系统,并且该输出又可以用作下一个作业的输入。
但是,这也意味着确保工作完成(成功/失败)的责任完全落在客户身上。在这种情况下,各种作业控制选项是:
Job.submit():将作业提交到集群并立即返回。
Job.waitForCompletion(boolean):将作业提交到集群并等待其完成。
InputFormat描述了MapReduce作业的输入规范。
MapReduce框架依靠作业的InputFormat来:
验证作业的输入规范。
将输入文件拆分为逻辑InputSplit实例,然后将每个实例分配给一个单独的Mapper。
提供RecordReader实现,该实现用于从逻辑InputSplit中收集输入记录,以供Mapper处理。
基于文件的InputFormat实现的默认行为(通常是FileInputFormat的子类)是根据输入文件的总大小(以字节为单位)将输入拆分为逻辑 InputSplit实例。但是,输入文件的FileSystem块大小被视为输入拆分的上限。可以通过mapreduce.input.fileinputformat.split.minsize设置拆分大小的下限。
显然,对于许多应用程序而言,基于输入大小的逻辑拆分是不够的,因为必须遵守记录边界。在这种情况下,应用程序应实现RecordReader,后者负责遵守记录边界,并向单个任务提供逻辑InputSplit的面向记录的视图。
TextInputFormat是默认的InputFormat。
如果TextInputFormat是给定作业的InputFormat,则框架将检测带有.gz扩展名的输入文件,并使用适当的CompressionCodec自动解压缩它们。但是,必须注意,具有上述扩展名的压缩文件无法拆分,并且每个压缩文件均由单个映射器完整处理。
InputSplit表示要由单个Mapper处理的数据。
通常,InputSplit呈现输入的面向字节的视图,RecordReader负责处理和呈现面向记录的视图。
FileSplit是默认的InputSplit。它将mapreduce.map.input.file设置为逻辑拆分的输入文件的路径。
RecordReader从InputSplit读取<key,value>对。
通常,RecordReader会转换InputSplit提供的输入的面向字节的视图,并将面向记录的呈现给Mapper实现以进行处理。因此,RecordReader承担处理记录边界的责任,并为任务提供键和值。
OutputFormat描述MapReduce作业的输出规范。
MapReduce框架依靠作业的OutputFormat来:
验证作业的输出规格;例如,检查输出目录是否不存在。
提供用于写入作业输出文件的RecordWriter实现。输出文件存储在FileSystem中。
TextOutputFormat是默认的OutputFormat。
OutputCommitter描述了MapReduce作业的任务输出的提交。
MapReduce框架依赖于作业的OutputCommitter来:
在初始化期间设置作业。例如,在作业初始化期间为该作业创建临时输出目录。当作业处于PREP状态且初始化任务后,作业设置由单独的任务完成。设置任务完成后,作业将移至“正在运行”状态。
作业完成后清理作业。例如,在作业完成后删除临时输出目录。作业清理由作业结束时的单独任务完成。清理任务完成后,作业被声明为SUCCEDED / FAILED / KILLED。
设置任务临时输出。在任务初始化期间,任务设置是同一任务的一部分。
检查任务是否需要提交。如果任务不需要提交,这将避免提交过程。
提交任务输出。任务完成后,如果需要,任务将提交其输出。
放弃任务提交。如果任务失败/被杀死,输出将被清除。如果任务无法清除(在异常块中),将使用相同的try-id启动单独的任务以进行清除。
FileOutputCommitter是默认的OutputCommitter。作业设置/清除任务会占用map或reduce容器,无论NodeManager可用哪个。并且JobCleanup任务,TaskCleanup任务和JobSetup任务具有最高优先级,并且顺序最高。
在某些应用程序中,组件任务需要创建和/或写入附带文件,这些文件与实际的作业输出文件不同。
在这种情况下,试图同时打开和/或写入FileSystem上的同一文件(路径)的同一Mapper或Reducer的两个实例同时运行(例如,推测性任务)可能会出现问题。因此,应用程序编写者将不得不为每个尝试执行任务的用户选择唯一的名称(使用尝试ID,例如try_200709221812_0001_m_000000_0),而不仅仅是每个任务。
为避免这些问题,当OutputCommitter为FileOutputCommitter时,MapReduce框架维护一个特殊的$ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ $ {taskid}子目录,可通过$ {mapreduce.task.output.dir}访问对每个任务尝试文件系统任务的地方,尝试的输出被存储。成功完成任务尝试后,$ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ $ {taskid}(仅)中的文件将升级为$ {mapreduce.output.fileoutputformat.outputdir}。当然,该框架会丢弃尝试失败的子目录。此过程对应用程序完全透明。
应用程序编写者可以通过执行FileOutputFormat.getWorkOutputPath(Conext)在执行任务期间在$ {mapreduce.task.output.dir}中创建所需的任何辅助文件来利用此功能,并且框架将同样对其进行成功升级任务尝试,因此无需为每个任务尝试选择唯一的路径。
注意:执行特定任务尝试期间的$ {mapreduce.task.output.dir}值实际上是$ {mapreduce.output.fileoutputformat.outputdir} / _ temporary / _ {$ taskid},此值由MapReduce框架。因此,只需在MapReduce任务的FileOutputFormat.getWorkOutputPath(Conext)返回的路径中创建任何辅助文件,即可利用此功能。
整个讨论都适用于具有reducer = NONE(即0减少)的作业图,因为在这种情况下,图的输出直接进入HDFS。
用户将作业提交到队列。队列作为作业的集合,允许系统提供特定的功能。例如,队列使用ACL来控制哪些用户可以向其提交作业。队列预计主要由Hadoop Scheduler使用。
Hadoop配置有一个强制性队列,称为“默认”。队列名称在Hadoop站点配置的mapreduce.job.queuename属性中定义。一些作业调度程序(例如Capacity Scheduler)支持多个队列。
作业定义了需要通过mapreduce.job.queuename属性或通过Configuration.set(MRJobConfig.QUEUE_NAME,String)API 提交到的队列。设置队列名称是可选的。如果提交的作业没有关联的队列名称,则将其提交到“默认”队列。
计数器代表由MapReduce框架或应用程序定义的全局计数器。每个Counter可以是任何Enum类型。特定Enum的计数器归为Counters.Group类型的组。
应用程序可以定义任意计数器(类型为Enum),并通过map和/或reduce方法中的Counters.incrCounter(Enum,long)或Counters.incrCounter(String,String,long)更新它们。然后,这些计数器由框架全局汇总。
DistributedCache有效地分发特定于应用程序的大型只读文件。
DistributedCache是MapReduce框架提供的一种工具,用于缓存应用程序所需的文件(文本,档案,jars等)。
应用程序通过Job中的URL(hdfs://)指定要缓存的文件。该DistributedCache假定通过指定的文件HDFS:// URL是已经存在的文件系统。
在作业的任何任务在该节点上执行之前,该框架会将必需的文件复制到该工作器节点。其效率源于以下事实:每个作业仅复制一次文件,以及缓存未存档在工作人员上的档案的能力。
DistributedCache跟踪缓存文件的修改时间戳。显然,在执行作业时,不应由应用程序或在外部修改缓存文件。
DistributedCache可用于分发简单的只读数据/文本文件以及更复杂的类型,例如存档和jar。归档文件(zip,tar,tgz和tar.gz文件)在工作程序节点上未归档。文件具有执行权限设置。
可以通过设置属性mapreduce.job.cache。{files | archives}来分发文件/归档。如果必须分发多个文件/归档,则可以将它们添加为逗号分隔的路径。也可以通过API Job.addCacheFile(URI) / Job.addCacheArchive(URI)和Job.setCacheFiles(URI []) / Job.setCacheArchives(URI [])设置属性,其中URI的格式为hdfs:// host :port / absolute-path#link-name。在流式传输中,可以通过命令行选项-cacheFile / -cacheArchive分发文件。
该DistributedCache也可以用作地图一个基本的软件分发机制使用和/或减少任务。它可用于分发jar和本机库。该Job.addArchiveToClassPath(路径)或Job.addFileToClassPath(路径) API可用于文件缓存/瓶,并将它们添加到classpath中儿童的JVM。通过设置配置属性mapreduce.job.classpath。{files | archives}可以完成相同的操作。类似地,链接到任务的工作目录中的缓存文件可用于分发本机库并加载它们。
DistributedCache文件可以是私有的也可以是公共的,这决定了如何在辅助节点上共享它们。
“私有” DistributedCache文件被缓存在作业需要这些文件的用户专用的本地目录中。这些文件仅由特定用户的所有任务和作业共享,并且其他用户在工作器上的作业无法访问这些文件。由于DistributedCache文件具有在其上载文件的文件系统(通常是HDFS)上的权限,因此它变得私有。如果文件没有世界可读的访问权限,或者导致文件的目录路径没有世界上的可执行文件访问权限,则文件将变为私有。
“公共” DistributedCache文件被缓存在全局目录中,并且文件访问权限已设置为对所有用户公开可见。这些文件可以由工作人员上所有用户的任务和作业共享。由于DistributedCache文件在上传文件的文件系统(通常是HDFS)上的权限而成为公共文件。如果文件具有全局可读访问权限,并且如果导致文件的目录路径具有全局可执行访问权限以进行查找,则文件将变为公用。换句话说,如果用户打算使文件对所有用户公开可用,则必须将文件权限设置为全局可读,并且指向该文件的路径上的目录权限必须是全局可执行的。
分析是一种实用程序,用于获取代表性的(2或3个)内置Java探查器示例,以获取地图和reduce的示例。
用户可以通过设置配置属性mapreduce.task.profile来指定系统是否应收集作业中某些任务的分析器信息。可以使用api Configuration.set(MRJobConfig.TASK_PROFILE,boolean)设置该值。如果将该值设置为true,那么将启用任务分析。探查器信息存储在用户日志目录中。默认情况下,不为作业启用概要分析。
一旦用户配置了需要分析的用户,就可以使用配置属性mapreduce.task.profile。{maps | reduces}设置要映射的MapReduce任务的范围。可以使用api Configuration.set(MRJobConfig.NUM_ {MAP | REDUCE} _PROFILES,String)设置该值。默认情况下,指定范围是0-2。
用户还可以通过设置配置属性mapreduce.task.profile.params来指定探查器配置参数。可以使用api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS,String)指定该值。如果字符串包含%s,则在任务运行时,它将替换为分析输出文件的名称。这些参数通过命令行传递到任务子JVM。分析参数的默认值为-agentlib:hprof = cpu = samples,heap = sites,force = n,thread = y,verbose = n,file =%s。
MapReduce框架提供了一种运行用户提供的脚本进行调试的功能。当MapReduce任务失败时,用户可以运行调试脚本,以处理例如任务日志。该脚本可以访问任务的stdout和stderr输出,syslog和jobconf。调试脚本的stdout和stderr的输出显示在控制台诊断中,也作为作业UI的一部分显示。
在以下各节中,我们讨论如何通过作业提交调试脚本。脚本文件需要分发并提交给框架。
用户需要使用DistributedCache来分发和符号链接到脚本文件。
提交调试脚本的一种快速方法是为mapreduce.map.debug.script和mapreduce.reduce.debug.script属性设置值,分别用于调试map和reduce任务。也可以使用API Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT,String)和Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT,String)来设置这些属性。在流模式,调试脚本可以使用命令行选项提交-mapdebug和-reducedebug,用于调试和地图分别减少任务。
脚本的参数是任务的stdout,stderr,syslog和jobconf文件。在MapReduce任务失败的节点上运行的debug命令是:
$ script $ stdout $ stderr $ syslog $ jobconf
管道程序将c ++程序名称作为命令的第五个参数。因此,对于管道程序,命令为
$ script $ stdout $ stderr $ syslog $ jobconf $ program
Hadoop MapReduce为应用程序编写器提供了为中间映射输出和作业输出(即reduce的输出)指定压缩的工具。它还与zlib压缩算法的CompressionCodec实现捆绑在一起。该gzip的,bzip2的,活泼的,和LZ4文件格式也支持。
出于性能(zlib)和Java库不可用的原因,Hadoop还提供了上述压缩编解码器的本机实现。有关其用法和可用性的更多详细信息,请参见此处。
应用程序可以通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS,boolean)api和通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,Class)api 使用的CompressionCodec来控制中间映射输出的压缩。
应用程序可以通过FileOutputFormat.setCompressOutput(Job,boolean) api 控制作业输出的压缩,可以通过FileOutputFormat.setOutputCompressorClass(Job,Class)api指定要使用的CompressionCodec。
如果作业输出要存储在SequenceFileOutputFormat中,则可以通过SequenceFileOutputFormat.setOutputCompressionType(Job,SequenceFile.CompressionType)api指定所需的SequenceFile.CompressionType(即RECORD / BLOCK-默认为RECORD)。
Hadoop提供了一个选项,可以在处理地图输入时跳过某些不良输入记录集。应用程序可以通过SkipBadRecords类控制此功能。
当地图任务在某些输入上确定性崩溃时,可以使用此功能。这通常是由于map函数中的错误引起的。通常,用户必须修复这些错误。但是,有时这是不可能的。该错误可能在第三方库中,例如,该第三方库的源代码不可用。在这种情况下,即使多次尝试,任务也永远无法成功完成,并且作业也会失败。使用此功能,仅丢失了坏记录周围的一小部分数据,这对于某些应用程序(例如那些对非常大的数据执行统计分析的应用程序)可以接受。
默认情况下,此功能处于禁用状态。要启用它,请参阅SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)。
启用此功能后,框架会在一定数量的地图失败后进入“跳过模式”。有关更多详细信息,请参见SkipBadRecords.setAttemptsToStartSkipping(Configuration,int)。在“跳过模式”下,地图任务会维护要处理的记录范围。为此,框架依赖于已处理的记录计数器。请参阅SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS和SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。该计数器使框架能够知道已成功处理了多少条记录,因此知道什么记录范围导致任务崩溃。在进一步尝试时,将跳过此记录范围。
跳过的记录数取决于应用程序增加处理的记录计数器的频率。建议在处理每条记录后将该计数器递增。在某些通常分批处理的应用程序中,这可能是不可能的。在这种情况下,框架可能会跳过不良记录周围的其他记录。用户可以通过SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)控制跳过的记录数。该框架尝试使用类似于二进制搜索的方法来缩小跳过记录的范围。跳过的范围分为两半,只有一半被执行。在后续失败时,框架会找出其中一半包含不良记录。将重新执行任务,直到达到可接受的跳过值或用尽所有任务尝试为止。要增加任务尝试次数,请使用Job.setMaxMapAttempts(int)和Job.setMaxReduceAttempts(int)
跳过的记录以序列文件格式写入HDFS,以供以后分析。可以通过SkipBadRecords.setSkipOutputPath(JobConf,Path)更改位置。
这是一个更完整的WordCount,它使用了到目前为止我们讨论的MapReduce框架提供的许多功能。
这需要HDFS能够启动并运行,尤其是对于与DistributedCache相关的功能。因此,它仅适用于伪分布式或全分布式 Hadoop安装。
导入java.io.BufferedReader; 导入java.io.FileReader; 导入java.io.IOException; 导入java.net.URI; 导入java.util.ArrayList; 导入java.util.HashSet; 导入java.util.List; 导入java.util.Set; 导入java.util.StringTokenizer; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; 导入org.apache.hadoop.io.IntWritable; 导入org.apache.hadoop.io.Text; 导入org.apache.hadoop.mapreduce.Job; 导入org.apache.hadoop.mapreduce.Mapper; 导入org.apache.hadoop.mapreduce.Reducer; 导入org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 导入org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 导入org.apache.hadoop.mapreduce.Counter; 导入org.apache.hadoop.util.GenericOptionsParser; 导入org.apache.hadoop.util.StringUtils; 公共类WordCount2 { 公共静态类TokenizerMapper 扩展Mapper <Object,Text,Text,IntWritable> { 静态枚举CountersEnum {INPUT_WORDS} 私有最终静态IntWritable一个= new IntWritable(1); 私人文字= new Text(); 私有布尔caseSensitive; 私人Set <String> patternsToSkip = new HashSet <String>(); 私人配置会议; 专用BufferedReader fis; @Override 公共无效设置(上下文上下文)抛出IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean(“ wordcount.case.sensitive”,true); 如果(conf.getBoolean(“ wordcount.skip.patterns”,false)){ URI []模式URIs = Job.getInstance(conf).getCacheFiles(); 用于(URI样式URI:样式URI){ 路径patternsPath =新路径(patternsURI.getPath()); 字符串patternsFileName = patternsPath.getName()。toString(); parseSkipFile(patternsFileName); } } } 私人无效parseSkipFile(String fileName){ 尝试{ fis = new BufferedReader(new FileReader(fileName)); 字符串模式= null; while((pattern = fis.readLine())!= null){ patternToSkip.add(pattern); } } catch(IOException ioe){ System.err.println(“解析缓存文件时捕获异常'” + StringUtils.stringifyException(ioe)); } } @Override 公共无效图(对象键,文本值,上下文上下文 )引发IOException,InterruptedException { 字符串行=(caseSensitive)? value.toString():value.toString()。toLowerCase(); 对于(字符串模式:patternsToSkip){ line = line.replaceAll(pattern,“”); } StringTokenizer itr =新的StringTokenizer(line); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,一个); 计数器counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } 公共静态类IntSumReducer 扩展Reducer <Text,IntWritable,Text,IntWritable> { 私有IntWritable结果= new IntWritable(); public void reduce(文本键,Iterable <IntWritable>值, 上下文上下文 )引发IOException,InterruptedException { int sum = 0; for(IntWritable val:values){ sum + = val.get(); } result.set(sum); context.write(key,result); } } 公共静态void main(String [] args)引发异常{ 配置conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf,args); String []剩余Args = optionParser.getRemainingArgs(); 如果((remainingArgs.length!= 2)&&(remainingArgs.length!= 4)){ System.err.println(“用法:wordcount <输入> <输出> [-跳过skipPatternFile]”); System.exit(2); } Job job = Job.getInstance(conf,“ word count”); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List <String> otherArgs = new ArrayList <String>(); for(int i = 0; i <剩余Args.length; ++ i){ if(“ -skip” .equals(remainingArgs [i])){ job.addCacheFile(new Path(remainingArgs [++ i])。toUri()); job.getConfiguration()。setBoolean(“ wordcount.skip.patterns”,true); }其他{ otherArgs.add(remainingArgs [i]); } } FileInputFormat.addInputPath(job,new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job,new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true)?0:1); } }
样本文本文件作为输入:
$ bin / hadoop fs -ls / user / joe / wordcount / input / / user / joe / wordcount / input / file01 / user / joe / wordcount / input / file02 $ bin / hadoop fs -cat / user / joe / wordcount / input / file01 世界,你好,再见! $ bin / hadoop fs -cat / user / joe / wordcount / input / file02 您好Hadoop,再见了hadoop。
运行应用程序:
$ bin / hadoop jar wc.jar WordCount2 / user / joe / wordcount / input / user / joe / wordcount / output
输出:
$ bin / hadoop fs -cat / user / joe / wordcount / output / part-r-00000 再见1 再见1 Hadoop,1 你好2 世界!1个 世界1 Hadoop。1个 到1
请注意,输入内容与我们查看的第一个版本不同,并且它们如何影响输出。
现在,让我们插入一个模式文件,该文件通过DistributedCache列出要忽略的单词模式。
$ bin / hadoop fs -cat /user/joe/wordcount/patterns.txt \。 \, \! 至
再次运行它,这次有更多选择:
$ bin / hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive = true / user / joe / wordcount / input / user / joe / wordcount / output -skip /user/joe/wordcount/patterns.txt
如预期的那样,输出:
$ bin / hadoop fs -cat / user / joe / wordcount / output / part-r-00000 再见1 再见1 Hadoop 1 你好2 世界2 Hadoop 1
再次运行它,这一次将区分大小写:
$ bin / hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive = false / user / joe / wordcount / input / user / joe / wordcount / output -skip /user/joe/wordcount/patterns.txt
果然,输出:
$ bin / hadoop fs -cat / user / joe / wordcount / output / part-r-00000 再见1 再见1 Hadoop 2 你好2 霍尔德2
WordCount的第二个版本通过使用MapReduce框架提供的一些功能对前一个版本进行了改进:
演示应用程序如何在Mapper(和Reducer)实现的设置方法中访问配置参数。
演示如何使用DistributedCache分发作业所需的只读数据。在这里,它允许用户指定在计数时要跳过的单词模式。
演示GenericOptionsParser的实用程序来处理通用的Hadoop命令行选项。
Demonstrates how applications can use Counters and how they can set application-specific status information passed to the map (and reduce) method.
Java and JNI are trademarks or registered trademarks of Oracle America, Inc. in the United States and other countries.