Hadoop Mapreduce Streaming Tricks and Techniques
I have been using Hadoop a lot now a days and thought about writing some of the novel techniques that a user could use to get the most out of the Hadoop Ecosystem.
Using Shell Scripts to run your Programs
I am not a fan of large bash commands. The ones where you have to specify the whole path of the jar files and the such. You can effectively organize your workflow by using shell scripts. Now Shell scripts are not as formidable as they sound. We wont be doing programming perse using these shell scripts(Though they are pretty good at that too), we will just use them to store commands that we need to use sequentially.
Below is a sample of the shell script I use to run my Mapreduce Codes.
#!/bin/bash
#Defining program variables
IP="/data/input"
OP="/data/output"
HADOOP_JAR_PATH="/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar"
MAPPER="test_m.py"
REDUCER="test_r.py"
hadoop fs -rmr -skipTrash $OP
hadoop jar $HADOOP_JAR_PATH \
-file $MAPPER -mapper "python test_m.py" \
-file $REDUCER -reducer "python test_r.py" \
-input $IP -output $OP
I generally save them as test_s.sh and whenever i need to run them i simply type sh test_s.sh
. This helps in three ways.
It helps me to store hadoop commands in a manageable way.
It is easy to run the mapreduce code using the shell script.
If the code fails, I do not have to manually delete the output directory
The simplification of anything is always sensational.
Gilbert K. Chesterton
Using Distributed Cache to provide mapper with a dictionary
Often times it happens that you want that your Hadoop Mapreduce program is able to access some static file. This static file could be a dictionary, could be parameters for the program or could be anything. What distributed cache does is that it provides this file to all the mapper nodes so that you can use that file in any way across all your mappers. Now this concept although simple would help you to think about Mapreduce in a whole new light. Lets start with an example. Supppose you have to create a sample Mapreduce program that reads a big file containing the information about all the characters in Game of Thrones stored as "/data/characters/"
:
Cust_IDUser_NameHouse1Daenerys TargaryenTargaryen2Tyrion LannisterLannister3Cersei LannisterLannister4Robert BaratheonBaratheon5Robb StarkStark
But you dont want to use the dead characters in the file for the analysis you want to do. You want to count the number of living characters in Game of Thrones grouped by their House. (I know its easy!!!!!) One thing you could do is include an if statement in your Mapper Code which checks if the persons ID is 4 then exclude it from the mapper and such. But the problem is that you would have to do it again and again for the same analysis as characters die like flies when it comes to George RR Martin.(Also where is the fun in that) So you create a file which contains the Ids of all the dead characters at "/data/dead_characters.txt"
:
Died45
Whenever you have to run the analysis you can just add to this file and you wont have to change anything in the code. Also sometimes this file would be long and you would not want to clutter your code with IDs and such.
So How Would we do it. Let’s go in a step by step way around this. We will create a shell script, a mapper script and a reducer script for this task.
1) Shell Script
#!/bin/bash
#Defining program variables
DC="/data/dead_characters.txt"
IP="/data/characters"
OP="/data/output"
HADOOP_JAR_PATH="/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar"
MAPPER="got_living_m.py"
REDUCER="got_living_r.py"
hadoop jar $HADOOP_JAR_PATH \
-file $MAPPER -mapper "python got_living_m.py" \
-file $REDUCER -reducer "python got_living_r.py" \
-cacheFile $DC#ref \
-input $IP -output $OP
Note how we use the "-cacheFile"
option here. We have specified that we will refer to the file that has been provided in the Distributed cache as #ref
.
Next is our Mapper Script.
2) Mapper Script
import sys
dead_ids = set()
def read_cache():
for line in open('ref'):
id = line.strip()
dead_ids.add(id)
read_cache()
for line in sys.stdin:
rec = line.strip().split("|") # Split using Delimiter "|"
id = rec[0]
house = rec[2]
if id not in dead_ids:
print "%s\t%s" % (house,1)
And our Reducer Script.
3) Reducer Script
import sys
current_key = None
key = None
count = 0
for line in sys.stdin:
line = line.strip()
rec = line.split('\t')
key = rec[0]
value = int(rec[1])
if current_key == key:
count += value
else:
if current_key:
print "%s:%s" %(key,str(count))
current_key = key
count = value
if current_key == key:
print "%s:%s" %(key,str(count))
This was a simple program and the output will be just what you expected and not very exciting. But the Technique itself solves a variety of common problems. You can use it to pass any big dictionary to your Mapreduce Program. Atleast thats what I use this feature mostly for. Hope You liked it. Will try to expand this post with more tricks.
The codes for this post are posted at github here.
Other Great Learning Resources For Hadoop: