Learning Spark using Python: Basics and Applications
I generally have a use case for Hadoop in my daily job. It has made my life easier in a sense that I am able to get results which I was not able to see with SQL queries. But still I find it painfully slow. I have to write procedural programs while I work. As in merge these two datasets and then filter and then merge another dataset and then filter using some condition and yada-yada. You get the gist. And in hadoop its painstakingly boring to do this. You have to write more than maybe 3 Mapreduce Jobs. One job will read the data line by line and write to the disk.
There is a lot of data movement that happens in between that further affects the speed. Another thing I hate is that there is no straight way to pass files to mappers and reducers and that generally adds up another mapreduce job to the whole sequence.
And that is just procedural tasks. To implement an iterative algorithm even after geting the whole logic of parallelization is again a challenge. There would be a lot of mapreduce tasks, a shell based driver program and a lot of unique thinking to bring everything together. And the running times are like crazy. Though sometimes it has its benefits:
That makes me think about the whole way Hadoop is implemented. While at the time Hadoop appeared the RAM was costly. Now that is not the case. We already have 64GB machines in our Hadoop cluster. So is it really a good idea to not use a larger chunk of memory and read line by line. Also can we have something that allows us to keep a particular piece of data in the memory, So that the next time our program needs it it doesnt have to read it again and waste time. Wouldnt it be better if we have some variable that lets us keep the state our iterative algorithm is in.
The Solution?
And here is where Spark comes to rescue. Now working on Spark is very different from Hadoop but when you start using it you find that it makes things so much easier. You still do have to think in the mapreduce way sort of but the way the map and reduce steps are done are a little bit different.
So lets first get Spark on our System (But keep in mind that for running spark in production environments you will need whole clusters set up. A liberty which you may or may not have at present)
The best way that I found to install Spark is following the Apache Spark installation guidelines with the Apache Spark eDx course. It lets you get Spark in your system and work with Spark with iPython notebooks. Something I prefer a lot and find the best way to code in Python.
The installation instructions can be found HERE. You may have to login in to an edX account to follow these instructions, but it is worth it.
So once you have gone through all the steps mentioned there and installed spark using these instructions, you would see something like this in your browser.
Ahh! so you have got Spark up and running now. That’s actually like half the process. I like to learn by examples so let’s get done with the “Hello World” of Distributed computing: The WordCount Program.
lines = sc.textFile("shakespeare.txt") # Distribute the data - Create a RDD
counts = (lines.flatMap(lambda x: x.split(' ')) # Create a list with all words
.map(lambda x: (x, 1)) # Create tuple (word,1)
.reduceByKey(lambda x,y : x + y)) # reduce by key i.e. the word
output = counts.take(10) # get the output on local
for (word, count) in output: # print output
print("%s: %i" % (word, count))
So that is a small example. Pretty small code when you compare it with Hadoop. And most of the work gets done in the second command. Don’t worry if you are not able to follow this yet as I need to tell you about the things that make Spark work.
But before we get into Spark basics, Let us refresh some of our python Basics. Understanding Spark becomes a lot easier if you have used Lambda functions in Python.
For those of you who haven’t used it, below is a brief intro.
Lambda Functions in Python
Map
Map is used to map a function to a array or a list. Say you want to apply some function to every element in a list. You can do this by simply using a for loop but python lambda functions let you do this in a single line in Python.
my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want to square each term in my_list.
squared_list = map(lambda x:x**2,my_list)
print squared_list
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
In the above example you could think of map as a function which takes two arguments - A function and a list. It then applies the function to every element of the list. What lambda allows you to do is write an inline function. In here the part “lambda x:x**2” defines a function that takes x as input and returns x^2.
You could have also provided a proper function in place of lambda. For Example:
def squared(x):
return x**2
<br>my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want to square each term in my_list.
squared_list = map(squared,my_list)
print squared_list
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
The same result, but the lambda expressions make the code compact and a lot more readable.
Filter
The other function that is used extensively is the filter function. This function takes two arguments - A condition and the list to filter. If you want to filter your list using some condition you use filter.
my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want only the even numbers in my list.
filtered_list = filter(lambda x:x%2==0,my_list)
print filtered_list
[2, 4, 6, 8, 10]
Reduce
The next function is the reduce function. This function will be the workhorse in Spark. This function takes two arguments - a function to reduce that takes two arguments, and a list over which the reduce function is to be applied.
my_list = [1,2,3,4,5]
# Lets say I want to sum all elements in my list.
sum_list = reduce(lambda x,y:x+y,my_list)
print sum_list
15
Here the lambda function takes in two values x, y and returns their sum. Intuitively you can think that the reduce function works as:
Reduce function first sends 1,2 ; the lambda function returns 3
Reduce function then sends 3,3 ; the lambda function returns 6
Reduce function then sends 6,4 ; the lambda function returns 10
Reduce function finally sends 10,5 ; the lambda function returns 15
A condition on the lambda function we use in reduce is that it must be commutative that is a + b = b + a and associative that is (a + b) + c == a + (b + c). In the above case we used sum which is commutative as well as associative. Other functions that we could have used are max, min, multiplication etc.
Moving Again to Spark
As we have now got the fundamentals of Python Functional Programming out of the way, lets again head to Spark.
But first let us delve a little bit into how spark works. Spark actually consists of two things a driver and workers. Workers normally do all the work and the driver makes them do that work.
An RDD is defined a parallelized data structure that gets distributed across the worker nodes. In our wordcount example, in the first line
lines = sc.textFile("data/cs100/lab1/shakespeare.txt")
We took a text file and distributed it across worker nodes so that they can work on it in parallel. We could also parallelize lists using the function
sc.parallelize
For example:
data = [1,2,3,4,5,6,7,8,9,10]
new_rdd = sc.parallelize(data,4)
new_rdd
ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:392
In Spark we classify the operations into two Basic Types: Transformations and Actions.
Transformations : Create new datasets from existing RDDs
Actions : Mechanism to get results out of Spark
Understanding Transformations
So lets say you have got your data in the form of an RDD. To requote your data is now accesible b all the worker machines. You want to do some transformations on the data now. You may want to filter, Apply some function etc. In Spark this is done using Transformation functions. Spark provides many transformation functions. You can see a comprehensive list here . Some of the main ones that I use frequently are:
1. Map:
Applies a given function to an RDD. Note that the syntax is a little bit different from python, but it necessarily does the same thing. Don’t worry about collet yet. For now just think of it as a function that collects the data in squared_rdd back to a list.
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
squared_rdd = rdd.map(lambda x:x**2)
squared_rdd.collect()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
2. Filter:
Again no surprises here. Takes as input a condition and keeps only those elements that fulfill that condition.
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
filtered_rdd = rdd.filter(lambda x:x%2==0)
filtered_rdd.collect()
[2, 4, 6, 8, 10]
3. Distinct:
Returns only distinct elements in an RDD
data = [1,2,2,2,2,3,3,3,3,4,5,6,7,7,7,8,8,8,9,10]
rdd = sc.parallelize(data,4)
distinct_rdd = rdd.distinct()
distinct_rdd.collect()
[8, 4, 1, 5, 9, 2, 10, 6, 3, 7]
4. Flatmap:
Similar to map, but each input item can be mapped to 0 or more output items
data = [1,2,3,4]
rdd = sc.parallelize(data,4)
flat_rdd = rdd.flatMap(lambda x:[x,x**3])
flat_rdd.collect()
[1, 1, 2, 8, 3, 27, 4, 64]
5. Reduce By Key:
The analogue to the reduce in Hadoop Mapreduce. Now Spark cannot provide the value if it just worked with Lists. In Spark there is a concept of pair RDDs that makes it a lot more flexible. Lets assume we have a data in which we have product, its category and its selling price. We can still parallelize the data.
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)
Right now our RDD rdd holds tuples. Now we want to find out the total sum of revenue that we got from each category. To do that we have to transform our rdd to a pair rdd so that it only contatins key-value pairs/tuples.
category_price_rdd = rdd.map(lambda x: (x[1],x[2]))
category_price_rdd.collect()
[(‘Fruit’, 200), (‘Fruit’, 24), (‘Fruit’, 56), (‘Vegetable’, 103), (‘Vegetable’, 34)]
Here we used the map function to get it in the format we wanted. When working with textfile, the rdd that gets formed has got a lot of strings. We use map to convert it into a format that we want.
So now our category_price_rdd contains the product category and the price at which the prouct sold. Now we want to reduce on the key and sum the prices. We can do this by:
category_total_price_rdd = category_price_rdd.reduceByKey(lambda x,y:x+y)
category_total_price_rdd.collect()
[(‘Vegetable’, 137), (‘Fruit’, 280)]
6. Group By Key:
Similar to reduce by key but does not reduce just puts all the elements in an iterator. For example if we wanted to keep as key the category and as the value all the products we would use this function.
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)
category_product_rdd = rdd.map(lambda x: (x[1],x[0]))
category_product_rdd.collect()
[(‘Fruit’,‘Apple’),(‘Fruit’,‘Banana’),(‘Fruit’,‘Tomato’),(‘Vegetable’,‘Potato’),(‘Vegetable’,‘Carrot’)]
grouped_products_by_category_rdd = category_product_rdd.groupByKey()
findata = grouped_products_by_category_rdd.collect()
for data in findata:
print data[0],list(data[1])
Vegetable [‘Potato’, ‘Carrot’]
Fruit [‘Apple’, ‘Banana’, ‘Tomato’]
Here the grouped by function worked and it returned the category and the list of products in that category.
Understanding Actions
Now you have filtered your data, mapped some functions on it. Done your computation. Now you want to get the data on your local machine or save it to a file. You will have to use actions for that. A comprehensive list of actions is provided HERE
Some of the most common actions that I tend to use are:
1. Collect:
We have already used this actio many times. It takes the whole rdd and brings it back to the driver program.
2. Reduce:
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
rdd = sc.parallelize([1,2,3,4,5])
rdd.reduce(lambda x,y : x+y)
15
3.take:
Return an list with the first n elements of the dataset.
rdd = sc.parallelize([1,2,3,4,5])
rdd.take(3)
[1, 2, 3]
4. takeOrdered:
Return the first n elements of the RDD using either their natural order or a custom comparator.
rdd = sc.parallelize([5,3,12,23])
rdd.takeOrdered(3,lambda s:-1*s) # descending order
[23, 12, 5]
rdd = sc.parallelize([(5,23),(3,34),(12,344),(23,29)])
rdd.takeOrdered(3,lambda s:-1*s[1]) # descending order
[(12, 344), (3, 34), (23, 29)]
So now lets take a look at the Wordcount Again
Understanding The WordCount Example
Now we sort of understand the transformations and the actions provided to us by Spark. It should not be difficult to understand the work count program now. Lets go through the program niw line by line.
The first lines creates a RDD and distributeds to the workers.
lines = sc.textFile("data/cs100/lab1/shakespeare.txt")
This RDD lines contains a list of strings that are actually the line in file. This RDD is of the form:
['word1 word2 word3','word4 word3 word2']
This next line is actually the workhorse function in the whole script.
counts = (lines.flatMap(lambda x: x.split(' '))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y : x + y))
It contains a series of transformations that we do to the lines RDD. First of all we do a flatmap transformation. The flatmap transformation takes as input the lines and gives words as output. So after the flatmap transformation the RDD is of the form:
['word1','word2','word3','word4','word3','word2']
Next we do a map transformation on the flatmap output which converts the rdd to :
[('word1',1),('word2',1),('word3',1),('word4',1),('word3',1),('word2',1)]
Finally we do a reduceByKey transformation which counts the number of time each word appeared. After which the rdd approaches the final desirable form.
[('word1',1),('word2',2),('word3',2),('word4',1)]
This next line is an action that takes the first 10 elements of the resulting RDD locally.
output = counts.take(10)
This line just prints the output
for (word, count) in output:
print("%s: %i" % (word, count))
Getting Serious
So till now we have talked about the Wordcount example and the basic transformations and actions that you could use in Spark. But we don’t do wordcount in real life. We have to work on bigger problems which are much more complex. Worry not! whatever we have learned till now will let us do that and more.
Lets work with a concrete example: I will work on an example in which Greg Rada Worked on Movielens Data with Pandas (BTW a great resource to learn Pandas). This example takes care of every sort of transformation that you may like to do with this data.
So lets first talk about the dataset. The movielens dataset contains a lot of files but we are going to be working with 3 files only:
Users: This file name is kept as “u.user”, The columns in this file are:
[‘user_id’, ‘age’, ‘sex’, ‘occupation’, ‘zip_code’]
Ratings: This file name is kept as “u.data”, The columns in this file are:
[‘user_id’, ‘movie_id’, ‘rating’, ‘unix_timestamp’]
Movies: This file name is kept as “u.item”, The columns in this file are:
[‘movie_id’, ‘title’, ‘release_date’, ‘video_release_date’, ‘imdb_url’, and 18 more columns…..]
##What are the 25 most rated movies? First of all lets load the data in different rdds. And see what the data contains.
userRDD = sc.textFile("/vagrant/ml-100k/u.user")
ratingRDD = sc.textFile("/vagrant/ml-100k/u.data")
movieRDD = sc.textFile("/vagrant/ml-100k/u.item")
print "userRDD:",userRDD.take(1)
print "ratingRDD:",ratingRDD.take(1)
print "movieRDD:",movieRDD.take(1)
Seeing the data we note that to answer this question we will need to use the ratingRdd. But the ratingRDD does not have movie name. So we would have to merge movieRDD and ratingRDD. So lets see how we would do that in Spark. Lets first do it step by step.Read the comments.
# Create a RDD from RatingRDD that only contains the two columns of interest i.e. movie_id,rating.
RDD_movid_rating = ratingRDD.map(lambda x : (x.split("\t")[1],x.split("\t")[2]))
print "RDD_movid_rating:",RDD_movid_rating.take(4)
# Create a RDD from MovieRDD that only contains the two columns of interest i.e. movie_id,title.
RDD_movid_title = movieRDD.map(lambda x : (x.split("|")[0],x.split("|")[1]))
print "RDD_movid_title:",RDD_movid_title.take(2)
# merge these two pair RDDs based on movie_id. For this we will use the transformation leftOuterJoin()
rdd_movid_title_rating = RDD_movid_rating.leftOuterJoin(RDD_movid_title)
print "rdd_movid_title_rating:",rdd_movid_title_rating.take(1)
# use the RDD in previous step to create (movie,1) tuple pair RDD
rdd_title_rating = rdd_movid_title_rating.map(lambda x: (x[1][1],1 ))
print "rdd_title_rating:",rdd_title_rating.take(2)
# Use the reduceByKey transformation to reduce on the basis of movie_title
rdd_title_ratingcnt = rdd_title_rating.reduceByKey(lambda x,y: x+y)
print "rdd_title_ratingcnt:",rdd_title_ratingcnt.take(2)
# Get the final answer by using takeOrdered Transformation
print "#####################################"
print "25 most rated movies:",rdd_title_ratingcnt.takeOrdered(25,lambda x:-x[1])
print "#####################################"
We could have done all this in a single command using the below command but the code is a little messy now. I did this to show that you can do things sequentially with Spark and you could bypass the process of variable creation.
print (((ratingRDD.map(lambda x : (x.split("\t")[1],x.split("\t")[2]))).
leftOuterJoin(movieRDD.map(lambda x : (x.split("|")[0],x.split("|")[1])))).
map(lambda x: (x[1][1],1)).
reduceByKey(lambda x,y: x+y).
takeOrdered(25,lambda x:-x[1]))
<div style="margin-top: 9px; margin-bottom: 10px;">
<center><img src="/images/result_rating_cnt_25_2.png"></center>
</div>
##Which movies are most highly rated?
Now we want to find the most highly rated 25 movvies using the same dataset. We actually want only those movies which have been rated atleast 100 times. Lets do this using Spark:
# We already have the RDD rdd_movid_title_rating: [(u'429', (u'5', u'Day the Earth Stood Still, The (1951)'))]
# We create an RDD that contains sum of all the ratings for a particular movie
rdd_title_ratingsum = (rdd_movid_title_rating.
map(lambda x: (x[1][1],int(x[1][0]))).
reduceByKey(lambda x,y:x+y))
print "rdd_title_ratingsum:",rdd_title_ratingsum.take(2)
# Merge this data with the RDD rdd_title_ratingcnt we created in the last step
# And use Map function to divide ratingsum by rating count.
rdd_title_ratingmean_rating_count = (rdd_title_ratingsum.
leftOuterJoin(rdd_title_ratingcnt).
map(lambda x:(x[0],(float(x[1][0])/x[1][1],x[1][1]))))
print "rdd_title_ratingmean_rating_count:",rdd_title_ratingmean_rating_count.take(1)
# We could use take ordered here only but we want to only get the movies which have count
# of ratings more than or equal to 100 so lets filter the data RDD.
rdd_title_rating_rating_count_gt_100 = (rdd_title_ratingmean_rating_count.
filter(lambda x: x[1][1]>=100))
print "rdd_title_rating_rating_count_gt_100:",rdd_title_rating_rating_count_gt_100.take(1)
# Get the final answer by using takeOrdered Transformation
print "#####################################"
print "25 highly rated movies:",
print rdd_title_rating_rating_count_gt_100.takeOrdered(25,lambda x:-x[1][0])
print "#####################################"
Conclusion
So Spark has Already provided an interface where we could apply transformations sequentially much easily than Hadoop. And it is fast. While in hadoop things are a pain to do sequentially, the infrastructure that Spark provides seem to fit naturally into the analytics use case.
Hopefully I’ve covered the basics well enough to pique your interest and help you get started with Spark. If I’ve missed something critical, feel free to let me know on Twitter or in the comments - I’d love constructive feedback.
You can find the Jupyter notebook HERE
One of the newest and best resources that you can keep an eye on is the Introduction to Big Data course in the Big Data Specialization from UCSanDiego