Hadoop流是Hadoop发行版随附的实用程序。该实用程序允许您使用任何可执行文件或脚本作为映射器和/或化简器来创建和运行Map / Reduce作业。例如:
映射流 输入myInputDirs \ -输出myOutputDir \ -mapper / bin / cat \ -reducer / usr / bin / wc
在上面的示例中,映射器和化简器都是可执行文件,它们从stdin(逐行)读取输入,并将输出发送到stdout。该实用程序将创建一个Map / Reduce作业,将该作业提交到适当的群集,并监视该作业的进度,直到完成为止。
为映射器指定可执行文件后,初始化映射器时,每个映射器任务都会将可执行文件作为单独的进程启动。当mapper任务运行时,它将其输入转换为行,并将行输入到流程的标准输入中。同时,映射器从流程的标准输出中收集面向行的输出,并将每行转换为键/值对,并将其作为映射器的输出进行收集。默认情况下,第一个制表符前一行的前缀是键,其余行(不包括制表符)将是value。如果该行中没有制表符,则将整行视为键,并且该值为null。但是,可以通过设置-inputformat自定义 命令选项,如稍后讨论。
当为reducer指定可执行文件时,每个reducer任务将以单独的进程启动可执行文件,然后初始化reducer。reducer任务运行时,它将其输入键/值对转换为行,并将这些行馈送到流程的标准输入。同时,Reducer从流程的stdout收集面向行的输出,将每条线转换为键/值对,将其作为reducer的输出收集。默认情况下,直到第一个制表符的行的前缀是键,其余的行(不包括制表符)是值。但是,这可以通过设置-outputformat命令选项来自定义,如稍后所述。
这是Map / Reduce框架与流式Mapper / Reducer之间的通信协议的基础。
用户可以将stream.non.zero.exit.is.failure指定为true或false,以使以非零状态退出的流任务分别为Failure或Success。默认情况下,以非零状态退出的流任务被视为失败任务。
流支持流命令选项以及通用命令选项。常规命令行语法如下所示。
注意:确保将通用选项放在流选项之前,否则命令将失败。有关示例,请参见使档案可用于任务。
映射的流[genericOptions] [streamingOptions]
Hadoop流命令选项在此处列出:
参数 | 可选/必需 | 描述 |
---|---|---|
-输入目录名或文件名 | 需要 | 映射器的输入位置 |
-输出目录名 | 需要 | 减速机的输出位置 |
-mapper可执行文件或JavaClassName | 可选的 | 映射器可执行文件。如果未指定,则将IdentityMapper用作默认值 |
-reducer可执行文件或JavaClassName | 可选的 | Reducer可执行文件。如果未指定,则将IdentityReducer用作默认值 |
-文件名 | 可选的 | 使映射器,reducer或combiner可执行文件在计算节点上本地可用 |
-inputformat JavaClassName | 可选的 | 您提供的类应返回Text类的键/值对。如果未指定,则将TextInputFormat用作默认值 |
-outputformat JavaClassName | 可选的 | 您提供的类应采用Text类的键/值对。如果未指定,则将TextOutputformat用作默认值 |
分区JavaClassName | 可选的 | 确定哪个还原键发送到的类 |
-combiner StreamingCommand或JavaClassName | 可选的 | 组合器可执行文件,用于地图输出 |
-cmdenv名称=值 | 可选的 | 将环境变量传递给流命令 |
-inputreader | 可选的 | 为了向后兼容:指定记录读取器类(而不是输入格式类) |
-冗长 | 可选的 | 详细输出 |
-lazyOutput | 可选的 | 延迟创建输出。例如,如果输出格式基于FileOutputFormat,则仅在第一次调用Context.write时创建输出文件。 |
-numReduceTasks | 可选的 | 指定减速机数量 |
-mapdebug | 可选的 | 地图任务失败时调用的脚本 |
-减少调试 | 可选的 | 减少任务失败时调用的脚本 |
您可以提供Java类作为映射器和/或reducer。
映射流 输入myInputDirs \ -输出myOutputDir \ -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer / usr / bin / wc
您可以将stream.non.zero.exit.is.failure指定为true或false,以使以非零状态退出的流任务分别为Failure或Success。默认情况下,以非零状态退出的流任务被视为失败任务。
您可以将任何可执行文件指定为映射器和/或化简器。可执行文件不需要预先存在于群集中的机器上;但是,如果没有,您将需要使用“ -file”选项来告诉框架将可执行文件打包为作业提交的一部分。例如:
映射流 输入myInputDirs \ -输出myOutputDir \ -mapper myPythonScript.py \ -reducer / usr / bin / wc \ -文件myPythonScript.py
上面的示例将用户定义的Python可执行文件指定为映射器。选项“ -file myPythonScript.py”将python可执行文件作为作业提交的一部分发送到群集计算机。
除了可执行文件之外,您还可以打包映射器和/或化简器可能使用的其他辅助文件(例如字典,配置文件等)。例如:
映射流 输入myInputDirs \ -输出myOutputDir \ -mapper myPythonScript.py \ -reducer / usr / bin / wc \ -文件myPythonScript.py \ -文件myDictionary.txt
与正常的Map / Reduce作业一样,您可以为流作业指定其他插件:
-inputformat JavaClassName -outputformat JavaClassName 分区JavaClassName -combiner StreamingCommand或JavaClassName
您为输入格式提供的类应返回Text类的键/值对。如果未指定输入格式类,则将TextInputFormat用作默认格式。由于TextInputFormat返回LongWritable类的键,而这些键实际上不是输入数据的一部分,因此将丢弃这些键;只有值将通过管道传输到流式映射器。
您为输出格式提供的类应该采用Text类的键/值对。如果未指定输出格式类,则将TextOutputFormat用作默认格式。
流支持流命令选项以及通用命令选项。常规命令行语法如下所示。
注意:确保将通用选项放在流选项之前,否则命令将失败。有关示例,请参见使档案可用于任务。
hadoop命令[genericOptions] [streamingOptions]
此处列出了可与流一起使用的Hadoop通用命令选项:
参数 | 可选/必需 | 描述 |
---|---|---|
-conf配置文件 | 可选的 | 指定应用程序配置文件 |
-D属性=值 | 可选的 | 给定属性的使用价值 |
-fs host:端口或本地 | 可选的 | 指定一个名称节点 |
文件 | 可选的 | 指定将逗号分隔的文件复制到Map / Reduce集群 |
-libjars | 可选的 | 指定逗号分隔的jar文件以包含在类路径中 |
-档案 | 可选的 | 指定以逗号分隔的存档,以在计算机上取消存档 |
您可以使用“ -D <属性> = <值>”来指定其他配置变量。
要更改本地临时目录,请使用:
-D dfs.data.dir = / tmp
要指定其他本地临时目录,请使用:
-D mapred.local.dir = / tmp /本地 -D mapred.system.dir = / tmp /系统 -D mapred.temp.dir = / tmp / temp
注意:有关作业配置参数的更多详细信息,请参见:mapred-default.xml
通常,您可能只想使用地图功能处理输入数据。为此,只需将mapreduce.job.reduces设置为零即可。Map / Reduce框架不会创建任何化简任务。而是,映射器任务的输出将是作业的最终输出。
-D mapreduce.job.reduces = 0
为了向后兼容,Hadoop Streaming还支持“ -reducer NONE”选项,该选项等效于“ -D mapreduce.job.reduces = 0”。
要指定减速器的数量(例如两个),请使用:
映射流 -D mapreduce.job.reduces = 2 \ 输入myInputDirs \ -输出myOutputDir \ -mapper / bin / cat \ -reducer / usr / bin / wc
如前所述,当Map / Reduce框架从映射器的stdout读取一行时,它将该行拆分为一个键/值对。默认情况下,第一个制表符之前的行的前缀是键,其余的行(不包括制表符)是值。
但是,您可以自定义此默认值。您可以指定除制表符(默认值)以外的字段分隔符,并且可以指定第n个(n> = 1)字符而不是一行中的第一个字符(默认值)作为键和值之间的分隔符。例如:
映射流 -D stream.map.output.field.separator =。\ -D stream.num.map.output.key.fields = 4 \ 输入myInputDirs \ -输出myOutputDir \ -mapper / bin / cat \ -reducer / bin / cat
在上面的示例中,“-D stream.map.output.field.separator =。” 指定“。” 作为地图输出的字段分隔符,并且前缀最多为第四个“。” 一行中的行将是键,其余行(不包括第四个“。”)将是值。如果一行少于四个“。”,则整行将是键,并且值将是一个空的Text对象(例如由new Text(“”)创建的对象)。
类似地,您可以使用“ -D stream.reduce.output.field.separator = SEP”和“ -D stream.num.reduce.output.fields = NUM”将reduce输出行中的第n个字段分隔符指定为键和值之间的分隔符。
同样,您可以将“ stream.map.input.field.separator”和“ stream.reduce.input.field.separator”指定为Map / Reduce输入的输入分隔符。默认情况下,分隔符是制表符。
使用-files和-archives选项,可以使文件和归档可用于任务。该参数是您已经上传到HDFS的文件或档案的URI。这些文件和存档在各个作业之间进行缓存。您可以从fs.default.name配置变量中检索host和fs_port值。
注意: -files和-archives选项是通用选项。确保将通用选项放在命令选项之前,否则命令将失败。
-files选项在任务的当前工作目录中创建指向该文件本地副本的符号链接。
在此示例中,Hadoop在任务的当前工作目录中自动创建一个名为testfile.txt的符号链接。此符号链接指向testfile.txt的本地副本。
-文件hdfs:// host:fs_port / user / testfile.txt
用户可以使用#为-files指定其他符号链接名称。
-文件hdfs:// host:fs_port / user / testfile.txt#testfile
可以这样指定多个条目:
-文件hdfs:// host:fs_port / user / testfile1.txt,hdfs:// host:fs_port / user / testfile2.txt
使用-archives选项,您可以将jar本地复制到任务的当前工作目录,并自动解压缩文件。
在此示例中,Hadoop在当前任务的工作目录中自动创建一个名为testfile.jar的符号链接。此符号链接指向存储上载jar文件的未jarned内容的目录。
-存档hdfs:// host:fs_port / user / testfile.jar
用户可以使用#为-archives指定其他符号链接名称。
-存档hdfs:// host:fs_port / user / testfile.tgz#tgzdir
在此示例中,input.txt文件有两行指定两个文件的名称:cachedir.jar / cache.txt和cachedir.jar / cache2.txt。“ cachedir.jar”是指向归档目录的符号链接,该目录具有文件“ cache.txt”和“ cache2.txt”。
映射流 -archives'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar'\ -D mapreduce.job.maps = 1 \ -D mapreduce.job.reduces = 1 \ -D mapreduce.job.name =“实验” \ -输入“ /user/me/samples/cachefile/input.txt” \ -输出“ / user / me / samples / cachefile / out” \ -mapper“ xargs cat” \ 减速器“猫” $ ls test_jar / cache.txt cache2.txt $ jar cvf cachedir.jar -C test_jar /。 添加清单 添加:cache.txt(in = 30)(out = 29)(缩小3%) 添加:cache2.txt(输入= 37)(输出= 35)(缩小5%) $ hdfs dfs -put cachedir.jar样本/缓存文件 $ hdfs dfs -cat /user/me/samples/cachefile/input.txt cachedir.jar / cache.txt cachedir.jar / cache2.txt $ cat test_jar / cache.txt 这只是缓存字符串 $ cat test_jar / cache2.txt 这只是第二个缓存字符串 $ hdfs dfs -ls / user / me / samples / cachefile / out 找到2项 -rw-r--r- * 1个我超级组0 2013-11-14 17:00 / user / me / samples / cachefile / out / _SUCCESS -rw-r--r- * 1 me超级组69 2013-11-14 17:00 / user / me / samples / cachefile / out / part-00000 $ hdfs dfs -cat / user / me / samples / cachefile / out / part-00000 这只是缓存字符串 这只是第二个缓存字符串
Hadoop具有一个库类KeyFieldBasedPartitioner,该类对许多应用程序很有用。此类允许Map / Reduce框架根据某些键字段(而不是整个键)对地图输出进行分区。例如:
映射流 -D stream.map.output.field.separator =。\ -D stream.num.map.output.key.fields = 4 \ -D map.output.key.field.separator =。\ -D mapreduce.partition.keypartitioner.options = -k1,2 \ -D mapreduce.job.reduces = 12 \ 输入myInputDirs \ -输出myOutputDir \ -mapper / bin / cat \ -reducer / bin / cat \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
在这里,-D stream.map.output.field.separator =。和-D stream.num.map.output.key.fields = 4如前面的示例中所述。流使用这两个变量来标识映射器的键/值对。
上面的Map / Reduce作业的map输出键通常有四个用“。”分隔的字段。但是,Map / Reduce框架将使用-D mapred.text.key.partitioner.options = -k1,2选项按键的前两个字段对地图输出进行分区。在此,-D map.output.key.field.separator =。指定分区的分隔符。这样可以确保将键中前两个字段相同的所有键/值对都划分为相同的reducer。
这实际上等效于将前两个字段指定为主键,然后将后两个字段指定为主键。主键用于分区,主键和辅助键的组合用于排序。一个简单的例子如下所示:
地图输出(按键)
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
划分为3个reducer(前2个字段用作分区的键)
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2
在reducer的每个分区内排序(用于排序的所有4个字段)
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3
Hadoop有一个库类KeyFieldBasedComparator,对许多应用程序都非常有用。此类提供了Unix / GNU Sort提供的功能的子集。例如:
映射流 -D mapreduce.job.output.key.comparator.class = org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \ -D stream.map.output.field.separator =。\ -D stream.num.map.output.key.fields = 4 \ -D mapreduce.map.output.key.field.separator =。\ -D mapreduce.partition.keycomparator.options = -k2,2nr \ -D mapreduce.job.reduces = 1 \ 输入myInputDirs \ -输出myOutputDir \ -mapper / bin / cat \ -reducer / bin / cat
上面的Map / Reduce作业的map输出键通常有四个用“。”分隔的字段。但是,Map / Reduce框架将使用-D mapreduce.partition.keycomparator.options = -k2,2nr选项按键的第二个字段对输出进行排序。在这里,-n指定排序是数字排序,而-r指定应该反转结果。一个简单的图示如下所示:
地图输出(按键)
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
归约器的排序输出(其中第二个字段用于排序)
11.14.2.3 11.14.2.2 11.12.1.2 11.12.1.1 11.11.4.1
Hadoop有一个名为Aggregate的库包。Aggregate提供一个特殊的reducer类和一个特殊的Combiner类,以及一系列简单的聚合器,这些聚合器对一系列值执行聚合,例如“ sum”,“ max”,“ min”等。通过Aggregate,您可以定义一个映射器插件类,该类将为映射器的每个输入键/值对生成“可聚合项”。合并器/归约器将通过调用适当的聚合器来聚合那些可聚合项。
要使用聚合,只需指定“ -reducer聚合”:
映射流 输入myInputDirs \ -输出myOutputDir \ -mapper myAggregatorForKeyCount.py \ -减速器合计\ -文件myAggregatorForKeyCount.py \
python程序myAggregatorForKeyCount.py看起来像:
#!/ usr / bin / python 进口系统; def generateLongCountToken(id): 返回“ LongValueSum:” + id +“ \ t” +“ 1” def main(argv): 行= sys.stdin.readline(); 尝试: 而线: line = line&#91;:-1]; 字段= line.split(“ \ t”); 打印generateLongCountToken(fields&#91; 0]); 行= sys.stdin.readline(); “文件末尾”除外: 不返回 如果__name__ ==“ __main__”: 主要(sys.argv)
Hadoop具有一个库类FieldSelectionMapReduce,该类有效地允许您处理文本数据,例如unix“ cut”实用程序。该类中定义的map函数将每个输入键/值对视为字段列表。您可以指定字段分隔符(默认为制表符)。您可以选择任意字段列表作为地图输出键,也可以选择任意字段列表作为地图输出值。类似地,在类中定义的reduce函数将每个输入键/值对视为字段列表。您可以选择任意字段列表作为reduce输出键,并选择任意字段列表作为reduce输出值。例如:
映射流 -D mapreduce.map.output.key.field.separator =。\ -D mapreduce.partition.keypartitioner.options = -k1,2 \ -D mapreduce.fieldsel.data.field.separator =。\ -D mapreduce.fieldsel.map.output.key.value.fields.spec = 6,5,1-3:0- \ -D mapreduce.fieldsel.reduce.output.key.value.fields.spec = 0-2:5- \ -D mapreduce.map.output.key.class = org.apache.hadoop.io.Text \ -D mapreduce.job.reduces = 12 \ 输入myInputDirs \ -输出myOutputDir \ -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \ -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
选项“ -D mapreduce.fieldsel.map.output.key.value.fields.spec = 6,5,1-3:0-”为地图输出指定键/值选择。键选择规范和值选择规范以“:”分隔。在这种情况下,地图输出键将由字段6、5、1、2和3组成。地图输出值将由所有字段组成(0-表示字段0和所有后续字段)。
选项“ -D mapreduce.fieldsel.reduce.output.key.value.fields.spec = 0-2:5-”指定缩减输出的键/值选择。在这种情况下,reduce输出关键字将包含字段0、1、2(对应于原始字段6、5、1)。减少输出值将包含从字段5开始的所有字段(对应于所有原始字段)。
通常,您不需要Map Reduce的全部功能,而只需要运行同一程序的多个实例-可以在数据的不同部分或在相同的数据上使用不同的参数。您可以使用Hadoop Streaming做到这一点。
例如,考虑跨Hadoop集群压缩(压缩)一组文件的问题。您可以通过使用Hadoop Streaming和自定义映射器脚本来实现:
生成一个包含输入文件的完整HDFS路径的文件。每个映射任务将获得一个文件名作为输入。
创建一个映射器脚本,该脚本具有给定的文件名,可以将文件获取到本地磁盘,对文件进行gzip压缩,然后将其放回所需的输出目录中。
有关详细信息,请参见MapReduce教程:Reducer
例如,说我这样做:别名c1 ='cut -f1'。-mapper“ c1”可以工作吗?
使用别名将不起作用,但是如以下示例所示,允许变量替换:
$ hdfs dfs -cat / user / me / samples / student_marks 爱丽丝50 布鲁斯70 查理80 丹75 $ c2 ='cut -f2'; 映射流 -D mapreduce.job.name ='实验'\ -输入/ user / me / samples / student_marks \ 输出/ user / me / samples / student_out \ -mapper“ $ c2” -reducer'cat' $ hdfs dfs -cat / user / me / samples / student_out / part-00000 50 70 75 80
例如,将-mapper“切割-f1 | sed s / foo / bar / g”起作用?
当前,这不起作用,并给出“ java.io.IOException:管道中断”错误。这可能是需要调查的错误。
例如,当我通过-file选项通过分发大型可执行文件(例如3.6G)来运行流作业时,出现“设备上没有剩余空间”错误。
jar打包发生在配置变量stream.tmpdir指向的目录中。stream.tmpdir的默认值为/ tmp。将值设置为具有更多空间的目录:
-D stream.tmpdir = / export / bigspace / ...
您可以使用多个“ -input”选项指定多个输入目录:
映射流 -输入'/ user / foo / dir1'-输入'/ user / foo / dir2'\ (命令的其余部分)
除了纯文本文件,您还可以生成gzip文件作为生成的输出。将'-D mapreduce.output.fileoutputformat.compress = true -D mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.GzipCodec'作为选项传递给流作业。
您可以使用记录读取器StreamXmlRecordReader来处理XML文档。
映射流 -inputreader“ StreamXmlRecord,begin = BEGIN_STRING,end = END_STRING” \ (命令的其余部分)
在BEGIN_STRING和END_STRING之间找到的任何内容都将被视为地图任务的一条记录。
StreamXmlRecordReader理解的名称-值属性是:
请参阅配置的参数。在执行流作业期间,将转换“ mapred”参数的名称。点(。)变成下划线(_)。例如,mapreduce.job.id变为mapreduce_job_id,而mapreduce.job.jar变为mapreduce_job_jar。在您的代码中,将参数名称与下划线一起使用。