Spark SQL with Python
This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.
In this post, I’ll very briefly summarize the Spark SQL functions necessary for the CCA175 exam. I also have a longer article on Spark available that goes into more detail and spans a few more topics.
- RDDs can contain any kind of unstructured data
- Spark 1.6 introduced DataFrames, which are DataSets of Row objects. These Row objects contain structured data (i.e. they have a schema: names and types)
- Big advantages:
- You can run SQL queries on DataFrames
- You can read and write to JSON, Hive, and parquet
- You can communicate with JDBC/ODBC
- Note: The all-encompassing
SparkSessioncreator only appeared in Spark 2.0. Currently, the CCA175 exam provides Spark 1.6, where you still work with a
SparkContext, and a
Creating DataFrames and DataSets
Spark has a
.toDF method, that converts an RDD to a DataSet or DataFrame. Note that you need to map the lines into
Row objects first:
You can also use an sqlContext to directly load data as a DataFrame:
Two ways to play
Consider the following table
0, "Anna", 32, "female", 172 1, "Bert", 21, "male", 182 2, "Curt", 42, "male", 170 3, "Doris", 43, "female", 164 4, "Edna", 22, "female", 171 5, "Felix", 19, "male", 160
There are two ways of programming Spark SQL queries. One is to issue actual SQL statements, like so:
The other way is to chain the respective methods together, in this example, in Python:
This is similar to the intuitive way the R packages
magrittr use. What’s more, this way you don’t have to create a temporary view.
A basic Spark/Python script
Coming back to the mini analysis in this post, let’s re-implement it in Spark SQL with a DataFrame:
Much faster and more elegant, right?
Reading and writing data
Spark < 2.0 has the
spark-csv package to read CSVs, which must be supplied when calling pyspark from the command line. But importing CSVs as an RDD and mapping to DataFrames works, too.
Writing data from a DataFrame’s
write method can only write to partitioned files. If you want to write a single text file, use the RDDs
Use metastore tables as an input source or output sink
On the Cloudera sandbox, you connect to the metastore database by issuing
(there is no space between
From there, you can list all stored databases via
SELECT * FROM DBS;. By default, you have one toy database stored in the
DBS table, which is at
hdfs:///user/hive/warehouse. The table
TBLS shows which tables exist in which DB. And lastly, you get the columns with column types in the table
This is how you would access and modify the metastore, but I don’t see the advantage of diving in there. It seems both easier and safer to just modify the tables from within, say, pyspark.
Filter, aggregate, join (between disparate sources), rank, and sort datasets
Consider the following two DataFrame objects,
# users # ID, name 0, "Hans" 1, "Peter
# hobbies # userID, hobby, frequency 0, "gym", "daily" 0, "drawing", "weekly" 1, "reading", "daily" 1, "guitar", "weekly" 1, "gaming", "daily" 1, "movies", "daily"
To use all objectives (filter, join, aggregate, rank) in one task, we are now going to do the following: Rank the users by number of daily hobbies. That is:
- Filter and keep only the daily hobbies.
- Join users to (daily) hobbies
- Aggregate and count the daily hobbies per users
- Rank the resulting table by number of daily hobbies
First, we do it using pyspark methods:
# result: # name, n Hans, 1 Peter, 3
Ranking is an inefficient operation, and still “a bit” tedious to write in pyspark:
# name, n, rankN Peter, 3, 1 Hans, 1, 2
Next, we do the same thing using Spark SQL:
users.registerTempTable("users") hobbies.registerTempTable("hobbies") sqlContext.sql(" SELECT doot.name, doot.n, RANK() OVER (ORDER BY doot.n DESC) AS rankN FROM ( SELECT u.name, COUNT(u.name) as n FROM hobbies h LEFT JOIN users u ON u.ID = h.userID WHERE h.frequency = 'daily' GROUP BY u.name ) doot ").show()