周海汉/文
2013.2.1
http://abloz.com
tom white的《hadoop the_definitive_guide 3nd edition》附录C里面讲到用streaming方式来处理气象的原始数据。由于气象原始数据是小文件压缩成的gz2文件,需要将文件解压,合并成一个大文件,再按年份压缩存放。这样便于后续的mapreduce处理。
white用的是streaming执行bash脚本 load_ncdc_map.sh。该脚本将NLineInputFormat格式的文本文件作为输入,内容是一个个存放在S3中的气象数据。解压,合并,上传到hadoop。
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
执行方式:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar
-D mapred.reduce.tasks=0
-D mapred.map.tasks.speculative.execution=false
-D mapred.task.timeout=12000000
-input ncdc_files.txt
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat
-output output
-mapper load_ncdc_map.sh
-file load_ncdc_map.sh
其中ncdc_files.txt格式:
s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1902.tar.bz2
...
可见streaming方式的mapreduce十分强大,可以执行脚本和第三方可执行程序,读取输入文件后标准输入作为可执行文件输入,可执行文件的输出作为标准输出。
仿其原理,我们可以做一个bash脚本来统计或排序大文件。
要统计排序的文本:README.txt
[zhouhh@Hadoop48 ~]$ ls -l README.txt
-rw-r--r-- 1 zhouhh zhouhh 1399 Feb 1 10:53 README.txt
[zhouhh@Hadoop48 ~]$ wc README.txt
34 182 1399 README.txt
[zhouhh@Hadoop48 ~]$ hadoop fs -ls
Found 3 items
-rw-r--r-- 2 zhouhh supergroup 9358 2013-01-10 17:52 /user/zhouhh/fsimage
drwxr-xr-x - zhouhh supergroup 0 2013-02-01 10:30 /user/zhouhh/gz
-rw-r--r-- 2 zhouhh supergroup 65 2012-09-26 14:10 /user/zhouhh/test中文.txt
[zhouhh@Hadoop48 ~]$ hadoop fs -put README.txt .
[zhouhh@Hadoop48 ~]$ hadoop fs -ls
Found 4 items
-rw-r--r-- 2 zhouhh supergroup 1399 2013-02-01 10:56 /user/zhouhh/README.txt
-rw-r--r-- 2 zhouhh supergroup 9358 2013-01-10 17:52 /user/zhouhh/fsimage
drwxr-xr-x - zhouhh supergroup 0 2013-02-01 10:30 /user/zhouhh/gz
-rw-r--r-- 2 zhouhh supergroup 65 2012-09-26 14:10 /user/zhouhh/test中文.txt
[zhouhh@Hadoop48 ~]$ hadoop fs -ls README.txt
Found 1 items
-rw-r--r-- 2 zhouhh supergroup 1399 2013-02-01 10:56 /user/zhouhh/README.txt
排序:
[zhouhh@Hadoop48 ~]$ hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input README.txt -output wordcount1 -mapper /bin/cat -reducer /bin/sort
[zhouhh@Hadoop48 ~]$ hadoop fs -ls wordcount/part*
Found 1 items
-rw-r--r-- 2 zhouhh supergroup 1433 2013-02-01 11:20 /user/zhouhh/wordcount/part-00000
统计行数,字数
[zhouhh@Hadoop48 ~]$ hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input README.txt -output wordcount1 -mapper /bin/cat -reducer /usr/bin/wc
[zhouhh@Hadoop48 ~]$ hadoop fs -cat wordcount1/p*
34 182 1433
这里可以发现一个问题,streaming会给输入文件的换行0a增加09字节(tab),变成090a,所以统计文件的字节数会是原文件字节数+行数,并且文件内容也改变了。 该问题导致sort后文件改变。用diff比较每行都多了一个09(tab)字节。
原理: mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。这可以定制。
如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。这可以定制。
对于需要复制到集群其他机器的文件,用-file filename 指定。
这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。
我们可以看到,原文件和streaming后的文件不同:
part of the two file of hex code: sort README.txt :
源文件:
0000000: 0a0a 0a0a 0a0a 0a61 6c67 6f72 6974 686d .......algorithm
0000010: 732e 2020 5468 6520 666f 726d 2061 6e64 s. The form and
0000020: 206d 616e 6e65 7220 6f66 2074 6869 7320 manner of this
0000030: 4170 6163 6865 2053 6f66 7477 6172 6520 Apache Software
0000040: 466f 756e 6461 7469 6f6e 0a61 6e64 206f Foundation.and o
0000050: 7572 2077 696b 692c 2061 743a 0a62 7920 ur wiki, at:.by
0000060: 6d6f 7274 6261 792e 6f72 672e 0a63 6865 mortbay.org..che
0000070: 636b 2079 6f75 7220 636f 756e 7472 7927 ck your country'
streaming README.txt and reduce sort:
0000000: 090a 090a 090a 090a 090a 090a 090a 616c ..............al
0000010: 676f 7269 7468 6d73 2e20 2054 6865 2066 gorithms. The f
0000020: 6f72 6d20 616e 6420 6d61 6e6e 6572 206f orm and manner o
0000030: 6620 7468 6973 2041 7061 6368 6520 536f f this Apache So
0000040: 6674 7761 7265 2046 6f75 6e64 6174 696f ftware Foundatio
0000050: 6e09 0a61 6e64 206f 7572 2077 696b 692c n..and our wiki,
这是因为streaming将每一行当成key+tab+value,而value为空。 参考: http://hadoop.apache.org/docs/r0.19.2/cn/streaming.html
如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源