mlwhiz

Turning data into insights

Hadoop Mapreduce Streaming Tricks and Techniques

I have been using Hadoop a lot now a days and thought about writing some of the novel techniques that a user could use to get the most out of the Hadoop Ecosystem.

Using Shell Scripts to run your Programs


I am not a fan of large bash commands. The ones where you have to specify the whole path of the jar files and the such. You can effectively organize your workflow by using shell scripts. Now Shell scripts are not as formidable as they sound. We wont be doing programming perse using these shell scripts(Though they are pretty good at that too), we will just use them to store commands that we need to use sequentially.

Below is a sample of the shell script I use to run my Mapreduce Codes.

#!/bin/bash
#Defining program variables
IP="/data/input"
OP="/data/output"
HADOOP_JAR_PATH="/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar"
MAPPER="test_m.py"
REDUCER="test_r.py"

hadoop fs -rmr -skipTrash $OP
hadoop jar $HADOOP_JAR_PATH \
-file $MAPPER -mapper "python test_m.py" \
-file $REDUCER -reducer "python test_r.py" \
-input $IP -output $OP
I generally save them as test_s.sh and whenever i need to run them i simply type sh test_s.sh. This helps in three ways.
  • It helps me to store hadoop commands in a manageable way.
  • It is easy to run the mapreduce code using the shell script.
  • If the code fails, I do not have to manually delete the output directory
The simplification of anything is always sensational.
Gilbert K. Chesterton

Using Distributed Cache to provide mapper with a dictionary


Often times it happens that you want that your Hadoop Mapreduce program is able to access some static file. This static file could be a dictionary, could be parameters for the program or could be anything. What distributed cache does is that it provides this file to all the mapper nodes so that you can use that file in any way across all your mappers. Now this concept although simple would help you to think about Mapreduce in a whole new light. Lets start with an example. Supppose you have to create a sample Mapreduce program that reads a big file containing the information about all the characters in Game of Thrones stored as "/data/characters/":
Cust_ID User_Name House
1 Daenerys Targaryen Targaryen
2 Tyrion Lannister Lannister
3 Cersei Lannister Lannister
4 Robert Baratheon Baratheon
5 Robb Stark Stark

But you dont want to use the dead characters in the file for the analysis you want to do. You want to count the number of living characters in Game of Thrones grouped by their House. (I know its easy!!!!!) One thing you could do is include an if statement in your Mapper Code which checks if the persons ID is 4 then exclude it from the mapper and such. But the problem is that you would have to do it again and again for the same analysis as characters die like flies when it comes to George RR Martin.(Also where is the fun in that) So you create a file which contains the Ids of all the dead characters at "/data/dead_characters.txt":

Died
4
5

Whenever you have to run the analysis you can just add to this file and you wont have to change anything in the code. Also sometimes this file would be long and you would not want to clutter your code with IDs and such.

So How Would we do it. Let's go in a step by step way around this. We will create a shell script, a mapper script and a reducer script for this task.

1) Shell Script
#!/bin/bash
#Defining program variables
DC="/data/dead_characters.txt"
IP="/data/characters"
OP="/data/output"
HADOOP_JAR_PATH="/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar"
MAPPER="got_living_m.py"
REDUCER="got_living_r.py"

hadoop jar $HADOOP_JAR_PATH \
-file $MAPPER -mapper "python got_living_m.py" \
-file $REDUCER -reducer "python got_living_r.py" \
-cacheFile $DC#ref \
-input $IP -output $OP

Note how we use the "-cacheFile" option here. We have specified that we will refer to the file that has been provided in the Distributed cache as #ref.

Next is our Mapper Script.

2) Mapper Script
import sys
dead_ids = set()

def read_cache():
    for line in open('ref'):
        id = line.strip()
        dead_ids.add(id)

read_cache()

for line in sys.stdin:
    rec = line.strip().split("|") # Split using Delimiter "|"
    id = rec[0]
    house = rec[2]
    if id not in dead_ids:
        print "%s\t%s" % (house,1)

And our Reducer Script.

3) Reducer Script
import sys
current_key = None
key = None
count = 0

for line in sys.stdin:
    line = line.strip()
    rec = line.split('\t')
    key = rec[0]    
    value = int(rec[1])
    
    if current_key == key:
        count += value
    else:
        if current_key:
            print "%s:%s" %(key,str(count))     
        current_key = key
        count = value

if current_key == key:
    print "%s:%s" %(key,str(count)) 

This was a simple program and the output will be just what you expected and not very exciting. But the Technique itself solves a variety of common problems. You can use it to pass any big dictionary to your Mapreduce Program. Atleast thats what I use this feature mostly for. Hope You liked it. Will try to expand this post with more tricks.

The codes for this post are posted at github here.

Other Great Learning Resources For Hadoop:

Also I like these books a lot. Must have for a Hadooper....

The first book is a guide for using Hadoop as well as spark with Python. While the second one contains a detailed overview of all the things in Hadoop. Its the definitive guide.

Online data science courses to jumpstart your future.

hadoop python
Advertiser Disclosure: All Amazon links are affiliate links, which means I receive compensation for any purchases through them. You do not have to purchase via my links, but you support me if you do.

Comments