Task4参考答案
MR原理
hadoop streaming 原理
作业思路 a. 准备数据 b. 写mr c. 执行并验证答案
使用Hadoop Streaming -python写出WordCount
Python版本 2.1 准备数据 vim wc_input.txt
hello
word
hello
world
hello
python
java
python
hadoop fs -mkdir -p /user/dw/wc/input/
hadoop fs -mkdir -p /user/dw/wc/output/
hadoop fs -put wc_input.txt /user/dw/wc/input/
2.2 编写map/reduce/run.sh(注意修改/path/to) mapper
import sys
for line in sys.stdin:
word = line.strip()
print(word+'\t'+'1')
reducer.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
word,val = line.strip().split('\t')
if cur_word==None:
cur_word = word
if cur_word!=word:
print('%s\t%s'%(cur_word,sum))
cur_word = word
sum = 0
sum+=int(val)
print('%s\t%s'%(cur_word,sum))
run.sh
HADOOP_CMD="/path/to/hadoop"
STREAM_JAR_PATH="/path/to/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH="/user/dw/wc/input/wc_input.txt"
OUTPUT_PATH="/user/dw/wc/output"
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py
2.3. 设置输入输出路径 2.4. 执行,sh run.sh 2.5. check答案
hadoop fs -cat /user/dw/wc/output/part-00000
1 使用mr计算movielen中每个用户的平均评分。 1.1 主备数据
wget wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip ml-100k.zip
cd ml-100k
mv u.data ml_input.txt
hadoop fs -mkdir -p /user/dw/avgscore/input
hadoop fs -mkdir -p /user/dw/avgscore/output
hadoop fs -put ml_input.txt /user/dw/avgscore/input/
1.2 编写map/reduce/run.sh mapper.py
import sys
for line in sys.stdin:
item = line.strip().split('\t')
print(item[1]+'\t'+item[2])
reduce.py
import sys
item_score = {}
for line in sys.stdin:
line = line.strip()
item, score = line.split('\t')
if item in item_score:
item_score[item].append(int(score))
else:
item_score[item] = []
item_score[item].append(int(score))
for item in item_score.keys():
ave_score = sum(item_score[item])*1.0 / len(item_score[item])
print '%s\t%s'% (item, ave_score)
run.sh 略,参考以上run.sh 1.3 check结果的正确性,自行编写python代码验证
实现merge功能 2.1 准备数据ml-100k u.data u.item 自行使用map先将二者分隔符统一,例如全部使用#作为分隔符
2.2 编写map/reduce/run mapper.py
import sys
for line in sys.stdin:
line = line.strip()
line = line.split(" ")
user = "-1"
item = "-1"
score = "-1"
item_name = "-1"
item_time = "-1"
if len(line) ==4:
user = line[0]
item = line[1]
score = line[2]
else:
item = line[0]
item_name = line[1]
item_score = line[2]
print '%s\t%s\t%s\t%s\t%s' % (user, item, score, item_name, item_time)
reducer.py
import sys
item_dict ={}
ui_dict={}
for line in sys.stdin:
line = line.strip()
user,item,score,item_name,item_time = line.split('\t')
if user == "-1":
item_dict[item] = [item_name,item_time]
else:
ui_dict[user] = [item,score]
for user in ui_dict.keys():
item_name = item_dict[ui_dict[user][0]]
item_time = item_dict[ui_dict[user][1]]
item = customer_dict[id][1]
score = customer_dict[id][2]
print '%s\t%s\t%s\t%s'% (user, item, score, item_name, item_time)
使用mr实现去重任务。 3.1 准备数据
1
2
3
4
5
6
1
2
3
3
其他步骤同上 3.2 map/reduce/run
mapper.py
import sys
for line in sys.stdin:
print(line+'\t'+' ')
reducer.py
import sys
last_key = None
for line in sys.stdin:
this_key = line.split('\t')[0].strip()
if this_key == last_key:
pass
else:
if last_key:
print(last_key)
last_key = this_key
print(this_key)
使用mr实现排序。 4.1 数据,使用上述计算的电影平均分作为输入
4.2 编写map/reduce/run
import sys
for line in sys.stdin:
line = line.strip()
print('{0}'.format(line))
import sys
for line in sys.stdin:
line = line.strip()
print("{0}".format(line))
HADOOP_CMD="/path/to/hadoop"
STREAM_JAR_PATH="/path/to/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH="/user/dw/wc/input/wc_input.txt"
OUTPUT_PATH="/user/dw/wc/output"
$HADOOP_CMD jar $STREAM_JAR_PATH \
-D stream.map.output.field.separator='\t' \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py
使用mapreduce实现倒排索引。
mapper.py
import os
import sys
docname = os.environ["map_input_file"]
for line in sys.stdin:
line = line.strip().split(' ')
for word in line:
print('{1}\t{2}'.format(line,docname)
reducer.py
import sys
word_doc_dict ={}
for line in sys.stdin:
line = line.strip()
word,docname = line.split('\t')
if word in word_doc_dict:
word_doc_dict[word].append(docname)
else:
word_doc_dict[word] = []
word_doc_dict[word].append(int(scdocname))
for word in word_doc_dict.keys():
print('{1}\t{2}'.format(word,','.join(word_doc_dict[word]))
使用mapreduce计算Jaccard相似度(参考spark实现)。
使用mapreduce实现PageRank(选做,答案略,自行查阅)。
Last updated
Was this helpful?