Introduction Spark
Let’s learn how to use Spark with Python by using the pyspark library! Make sure to view the video lecture explaining Spark and RDDs before continuing on with this code.
This notebook will serve as reference code for the Big Data section of the course involving Amazon Web Services. The video will provide fuller explanations for what the code is doing.
Creating a SparkContext
First we need to create a SparkContext. We will import this from pyspark:
from pyspark import SparkContext
Now create the SparkContext,A SparkContext represents the connection to a Spark cluster, and can be used to create an RDD and broadcast variables on that cluster.
Note! You can only have one SparkContext at a time the way we are running things here.
sc = SparkContext()
Basic Operations
We’re going to start with a ‘hello world’ example, which is just reading a text file. First let’s create a text file.
Let’s write an example text file to read, we’ll use some special jupyter notebook commands for this, but feel free to use any .txt file:
%%writefile example.txt
first line
second line
third line
fourth line
Overwriting example.txt
Creating the RDD
Now we can take in the textfile using the textFile method off of the SparkContext we created. This method will read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
textFile = sc.textFile('example.txt')
Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.
Actions
We have just created an RDD using the textFile method and can perform operations on this object, such as counting the rows.
RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let’s start with a few actions:
textFile.count()
4
textFile.first()
'first line'
Transformations
Now we can use transformations, for example the filter transformation will return a new RDD with a subset of items in the file. Let’s create a sample transformation using the filter() method. This method (just like Python’s own filter function) will only return elements that satisfy the condition. Let’s try looking for lines that contain the word ‘second’. In which case, there should only be one line that has that.
secfind = textFile.filter(lambda line: 'second' in line)
# RDD
secfind
PythonRDD[7] at RDD at PythonRDD.scala:43
# Perform action on transformation
secfind.collect()
['second line']
# Perform action on transformation
secfind.count()
1
Notice how the transformations won’t display an output and won’t be run until an action is called. In the next lecture: Advanced Spark and Python we will begin to see many more examples of this transformation and action relationship!
Important Terms
Let’s quickly go over some important terms:
Term | Definition |
---|---|
RDD | Resilient Distributed Dataset |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
Spark Job | Sequence of transformations on data with a final action |
Creating an RDD
There are two common ways to create an RDD:
Method | Result |
---|---|
sc.parallelize(array) |
Create RDD of elements of array (or list) |
sc.textFile(path/to/file) |
Create RDD of lines from file |
RDD Transformations
We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).
Transformation Example | Result |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
RDD Actions
Once you have your ‘recipe’ of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:
Action | Result |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
Examples
Now the best way to show all of this is by going through examples! We’ll first review a bit by creating and working with a simple text file, then we will move on to more realistic data, such as customers and sales data.
Creating an RDD from a text file:
** Creating the textfile **
%%writefile example2.txt
first
second line
the third line
then a fourth line
Writing example2.txt
Now let’s perform some transformations and actions on this text file:
from pyspark import SparkContext
sc = SparkContext()
# Show RDD
sc.textFile('example2.txt')
MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')
# Map a function (or lambda expression) to each line
# Then collect the results.
text_rdd.map(lambda line: line.split()).collect()
[['first'],
['second', 'line'],
['the', 'third', 'line'],
['then', 'a', 'fourth', 'line']]
Map vs flatMap
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()
['first',
'second',
'line',
'the',
'third',
'line',
'then',
'a',
'fourth',
'line']
RDDs and Key Value Pairs
Now that we’ve worked with RDDs and how to aggregate values with them, we can begin to look into working with Key Value Pairs. In order to do this, let’s create some fake data as a new text file.
This data represents some services sold to customers for some SAAS business.
%%writefile services.txt
#EventId Timestamp Customer State ServiceID Amount
201 10/13/2017 100 NY 131 100.00
204 10/18/2017 700 TX 129 450.00
202 10/15/2017 203 CA 121 200.00
206 10/19/2017 202 CA 131 500.00
203 10/17/2017 101 NY 173 750.00
205 10/19/2017 202 TX 121 200.00
Writing services.txt
services = sc.textFile('services.txt')
services.take(2)
['#EventId Timestamp Customer State ServiceID Amount',
'201 10/13/2017 100 NY 131 100.00']
services.map(lambda x: x.split())
PythonRDD[10] at RDD at PythonRDD.scala:43
services.map(lambda x: x.split()).take(3)
[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
['201', '10/13/2017', '100', 'NY', '131', '100.00'],
['204', '10/18/2017', '700', 'TX', '129', '450.00']]
Let’s remove that first hash-tag!
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()
['EventId Timestamp Customer State ServiceID Amount',
'201 10/13/2017 100 NY 131 100.00',
'204 10/18/2017 700 TX 129 450.00',
'202 10/15/2017 203 CA 121 200.00',
'206 10/19/2017 202 CA 131 500.00',
'203 10/17/2017 101 NY 173 750.00',
'205 10/19/2017 202 TX 121 200.00']
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()
[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
['201', '10/13/2017', '100', 'NY', '131', '100.00'],
['204', '10/18/2017', '700', 'TX', '129', '450.00'],
['202', '10/15/2017', '203', 'CA', '121', '200.00'],
['206', '10/19/2017', '202', 'CA', '131', '500.00'],
['203', '10/17/2017', '101', 'NY', '173', '750.00'],
['205', '10/19/2017', '202', 'TX', '121', '200.00']]
Using Key Value Pairs for Operations
Let us now begin to use methods that combine lambda expressions that use a ByKey argument. These ByKey methods will assume that your data is in a Key,Value form.
For example let’s find out the total sales per state:
# From Previous
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())
cleanServ.collect()
[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
['201', '10/13/2017', '100', 'NY', '131', '100.00'],
['204', '10/18/2017', '700', 'TX', '129', '450.00'],
['202', '10/15/2017', '203', 'CA', '121', '200.00'],
['206', '10/19/2017', '202', 'CA', '131', '500.00'],
['203', '10/17/2017', '101', 'NY', '173', '750.00'],
['205', '10/19/2017', '202', 'TX', '121', '200.00']]
# Let's start by practicing grabbing fields
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()
[('State', 'Amount'),
('NY', '100.00'),
('TX', '450.00'),
('CA', '200.00'),
('CA', '500.00'),
('NY', '750.00'),
('TX', '200.00')]
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : amt1+amt2)\
.collect()
[('State', 'Amount'),
('NY', '100.00750.00'),
('TX', '450.00200.00'),
('CA', '200.00500.00')]
Uh oh! Looks like we forgot that the amounts are still strings! Let’s fix that:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.collect()
[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]
We can continue our analysis by sorting this output:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()
[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]
** Remember to try to use unpacking for readability. For example: **
x = ['ID','State','Amount']
def func1(lst):
return lst[-1]
def func2(id_st_amt):
# Unpack Values
(Id,st,amt) = id_st_amt
return amt
func1(x)
'Amount'
func2(x)
'Amount'
Leave a comment