Filter, aggregate, join, rank, and sort datasets (Spark/Python)
This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.
There are two categories of operations on RDDs: Transformations modify an RDD (e.g. filter out some lines) and return an RDD, and actions modify an RDD and return a Python object.
I’ll show examples with two RDDs: one consists of only values (think “one column”, the other of key/value pairs). These are the example RDDs:
A key/value RDD just contains a two element tuple, where the first item is the key and the second item is the value (it can be a list of values, too). The best idea is probably to open a
pyspark shell and experiment and type along. You can always “print out” an RDD with its
In the following, I’ll go through a quick explanation and an example for the most common methods. A comprehensive list is available here.
- map: Transform your data row-wise and 1:1 with a function
- flatMap: Similar but “flattens” the results, i.e. loses one dimension. Think
unlist()in R. The resulting value
yin the following example now has five elements after
flatMap(), instead of two elements after
- reduceByKey: Reduces an RDD but keeps it as an RDD (unlike ‘reduce’)
- groupByKey: Summarizes the RDD into unique keys and an Iterable of all values. This can be passed to
mapValuesthen, for example. The following line is one of many ways to count the number of elements per key:
mapValues, flatMapValues: More efficient than map and flatMap because Spark can maintain the partitioning. Try to use these functions instead where possible.
sortByKey: Sorts the keys in ascending order. You can use sortBy to define a custom sorting function (e.g.
lambda x: xfor the second “column”)
filter: Select only interesting entries from your RDD
distinct: Keep only unique elements
join, rightOuterJoin, leftOuterJoin, fullOuterJoin: Performs joins on the keys of two RDDs, and returns the keys along with the pairs of values.
- union, intersection, subtract, cartesian: These set operations take two RDDs and combine them.
- collect: Dump all elements, i.e. converts the RDD to a Python list
- count: Returns the number of elements in an RDD
- countByValue: Outputs a dictionary of
(key, n), i.e. a count, by unique value. This is similar to doing
map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).collect(). Note that you need the
.items()method on your dictionary to convert it into a list of key/value-tuples.
- take, top: Sample a few values
- reduce: Aggregate all values for a given key value
Detailed explanations are available here.
Note that transformations return RDDs, but actions return “normal” Python objects. After an action, you can use standard Python on these objects again.
Your RDDs can include single values per element (e.g. one string representing an entire line from a text file), or key-value pairs. With keys, you can work on RDDs much like on NoSQL databases. You create key-value RDDs by having a map output two values for each input, e.g. with
z = y.map(lambda x: (x, 1)). Here, the value will be 1. You can also use lists as values.
Some advanced calls
Using this data set (year, temperature, quality code):
In Python, the following piece of code selects all values where the year is not 9999 (a NA value), and the quality score is one of 0, 1, 4, 5, and 9. Then, it selects the year (as key) and temperature (as value), and outputs a text file with the two lines
(1949, 111) and
reduceByKey can take an anonymous function that is associative and commutative and combines two arguments. For example:
reduceByKey(lambda x,y: x+y) would just add up all values by key.
Custom map functions to parse lines
Suppose we have this sample data:
For each age, we want to get the average number of friends.
You can write your own mapper function like this:
To get the average, we first get the sum total and the number of entries per age.
We transform each value, e.g. 255 friends, into a pair (255, 1). We can then sum up both elements of the value pair and divide the total sum by the total count to get the average.
- You can use
distData = sc.broadcast(someData)to distribute your object
someDataso that the cluster can access it quickly. Use
distData.valueto access it.
- Accumulators allow all task executors to increment a shared variable. Create one with
myCounter = sc.accumulator(0). Increase it with
myCounter.add(1)and access the value with
- Spark discards RDDs after you’ve called an action on them. If you want to keep them for further evaluation, use
DataFrames and Spark SQL
These two concepts extend the RDD concept to a “DataFrame” object that contains structured data. DataFrames contain Row objects, which allows you to issue SQL queries. The fact that the data has a schema allows Spark to run some optimization on storage and querying. You can also easier read and write to JSON, Hive, or Parquet, and also communicate with JDBC/ODBC or even Tableau.
Some examples using the following data again:
The following script loads this data and creates a DataFrame. Note that with Spark 2.0, this will be a bit easier. This is the Spark 1.6 solution.
Technicalities: In Spark 1.6, DataFrames appeared. In Spark 2.0, DataFrames became DataSets of Row objects. In Spark 2.0 you should use DataSets where possible. They are more general and can contain elements of other classes as well. The CCA175 currently only comes with Spark 1.6 though.
Joins with DataFrames or SparkSQL
Let’s create a second DataFrame with some users’ hobbies, and join the user name to it:
The exact same result can be obtained via “actual” SQL as follows:
Ranking data with DataFrames or SparkSQL
We’ll rank the heights of these six people, first globally, and then grouped per gender. I’ll also show it first using DataFrames, and then via Spark SQL.