This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.

In this post, I’ll briefly summarize the core Spark functions necessary for the CCA175 exam. I also have a longer article on Spark available that goes into more detail and spans a few more topics.

The Spark context (often named sc) has methods for creating RDDs and is responsible for making RDDs resilient and distributed.

Reading data

This is how you would use Spark and Python to create RDDs from different sources:

from pyspark import SparkConf, SparkContext
import sys

conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf = conf)

# Create a hardcoded RDD
numbers = sc.parallelize([1, 2, 3, 4])

# Load plain text files (e.g. CSVs) from different sources
hdfs_lines = sc.textFile("hdfs:///user/cloudera/ml-100k/u.data", minPartitions=1)
local_lines = sc.textFile("file:///home/alex/ml-100k/u.data")
s3_lines = sc.textFile("s3n://bucket/ml-100k/u.data")
CLI_arg_lines = sc.textFile(sys.argv[1])

# Create RDDs from an existing Hive repository
hive_ctx = HiveContext(sc)
hive_lines = hive_ctx.sql("SELECT name, age FROM users WHERE age > 18")

Note that you cannot run this with your standard Python interpreter. Instead, you use spark-submit to submit it as a batch job, or call pyspark from the Shell.

Other file sources include JSON, sequence files, and object files, which I won’t cover, though.

Writing data

The RDD class has a saveAsTextFile method. However, this saves a string representation of each element. In Python, your resulting text file will contain lines such as (1949, 111).

If you want to save your data in CSV or TSV format, you can either use Python’s StringIO and csv_modules (described in chapter 5 of the book “Learning Spark”), or, for simple data sets, just map each element (a vector) into a single string, e.g. like this:

res.saveAsTextFile("hdfs:///user/cloudera/res_raw.txt")  # bad format

res.map(lambda row: str(row[0]) + "\t" + str(row[1])) \
   .saveAsTextFile("hdfs:///user/cloudera/res_tsv.txt")  # good format

Use metastore tables as input and output

Metastore tables store meta information about your stored data, such as the HDFS path to a table, column names and types. The HiveContext inherits from SQLContext and can find tables in the Metastore:

# First, create empty tables in Hive:
sqlContext.sql("CREATE TABLE IF NOT EXISTS myTab (key INT, value STRING)")
# Custom field delimitors:
sqlContext.sql("CREATE TABLE IF NOT EXISTS myTab (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY \",\"") 

# Import file from local file system into Hive:
sqlContext.sql("LOAD DATA LOCAL INPATH '/home/cloudera/Downloads/kv1.txt' OVERWRITE INTO TABLE src")
# Without 'LOCAL', import file from HDFS:
sqlContext.sql("LOAD DATA INPATH '/user/cloudera/kv1.txt' OVERWRITE INTO TABLE src")

To write data from Spark into Hive, do this:

df.registerTempTable("temptable") 
sqlContext.sql("CREATE TABLE IF NOT EXISTS mytable AS SELECT * FROM temptable")

# or, if the table already exists:

sqlContext.sql("INSERT INTO TABLE mytable SELECT * FROM temptable")

These HiveQL commands of course work from the Hive shell, as well.

You can then load data from Hive into Spark with commands like

myDF = sqlContext.sql("SELECT * FROM myTab WHERE ID > 1000")

To write data from Spark into Hive, you can also transform it into a DataFrame and use this class’s write method:

from pyspark.sql import Row
from import HiveContext
sqlContext = hiveContext(sc)

rdd = sc.parallelize([1,2,3,4])

# write as text file:
rdd.map(lambda x: Row(var1 = x)).toDF().write.format('json').\
    save('file:///home/cloudera/temp')

# write to Hive:
sqlContext.sql("CREATE TABLE doot (var1 INT)")
rdd.map(lambda x: Row(var1 = x)).toDF().write.insertInto("doot")
# NOTE: This command gave me an error message in the pyspark shell, but was executed
#  nonetheless. Check the contents of your Hive table before issuing the command again!

# Alternatively, to append data:
rdd.map(lambda x: Row(var1 = x)).toDF().write.mode("append").saveAsTable("doot")

Hive tables, by default, are stored in the warehouse at /user/hive/warehouse. This directory contains one folder per table, which in turn stores a table as a collection of text files.

Interacting with the Hive Metastore

This requirement for the CCA175 exam is a fancy way of saying “create and modify Hive tables). The metastore holds meta information about your tables, i.e. column names, numbers, and types.

So the requirement here is to get familiar with the CREATE TABLE and DROP TABLE commands from SQL. (ALTER TABLE does not work from within SPARK and should be done from beeline).

sqlContext.sql("CREATE TABLE boop (name VARCHAR(255), age INT)")
sqlContext.sql("DROP TABLE boop")