服务器 发布日期:2024/11/2 浏览次数:1
Hadoop streaming
Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口。所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写。
streamming是天然适用于文字处理的(text processing),当然,也仅适用纯文本的处理,对于需要对象和序列化的场景,hadoop streaming无能为力。它力图使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。以下是steaming的一些特点:
常用的Streaming编程语言:
Ruby
下面是一个Ruby编写的MapReduce程序的示例:
map
max_temperature_map.rb:
ruby #!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end
reduce
max_temperature_reduce.rb:
ruby #!/usr/bin/env ruby last_key, max_val = nil, -1000000 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar -input input/ncdc/sample.txt -output output -mapper ch02/src/main/ruby/max_temperature_map.rb -reducer ch02/src/main/ruby/max_temperature_reduce.rb
Python
Map
#!/usr/bin/env python import re import sys for line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp)
Reduce
#!/usr/bin/env python import sys (last_key, max_val) = (None, -sys.maxint) for line in sys.stdin: (key, val) = line.strip().split("\t") if last_key and last_key != key: print "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else: (last_key, max_val) = (key, max(max_val, int(val))) if last_key: print "%s\t%s" % (last_key, max_val)
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar -input input/ncdc/sample.txt -output output -mapper ch02/src/main/ruby/max_temperature_map.py-reducer ch02/src/main/ruby/max_temperature_reduce.py
Bash shell
Map
#!/usr/bin/env bash # NLineInputFormat gives a single line: key is offset, value is S3 URI read offset s3file # Retrieve file from S3 to local disk echo "reporter:status:Retrieving $s3file" >&2 $HADOOP_INSTALL/bin/hadoop fs -get $s3file . # Un-bzip and un-tar the local file target=`basename $s3file .tar.bz2` mkdir -p $target echo "reporter:status:Un-tarring $s3file to $target" >&2 tar jxf `basename $s3file` -C $target # Un-gzip each station file and concat into one file echo "reporter:status:Un-gzipping $target" >&2 for file in $target/*/* do gunzip -c $file $target.all echo "reporter:status:Processed $file" >&2 done # Put gzipped version into HDFS echo "reporter:status:Gzipping $target and putting in HDFS" >&2 gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
运行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar -D mapred.reduce.tasks=0 -D mapred.map.tasks.speculative.execution=false -D mapred.task.timeout=12000000 -input ncdc_files.txt -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat -output output -mapper load_ncdc_map.sh -file load_ncdc_map.sh
Combiner
在streaming模式下,仍然可以运行Combiner,两种方法:
这里具体解释第二种方法:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar -input input/ncdc/all -output output -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort | ch02/src/main/ruby/max_temperature_reduce.rb" -reducer ch02/src/main/ruby/max_temperature_reduce.rb -file ch02/src/main/ruby/max_temperature_map.rb -file ch02/src/main/ruby/max_temperature_reduce.rb
注意看-mapper这一行,通关管道的方式,把mapper的临时输出文件(intermediate file,Map完成后的临时文件)作为输入,送到sort进行排序,然后送到reduce脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到Reduce
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!