This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.

Refer to my Scala cheatsheet for an overview of Scala syntax. In this post, I will just use and assemble the pieces from this post.

This post should be seen as an extension of the relevant posts for Spark with Python. I won’t go into the Spark basics again here, but just highlight how to do things in Scala. The Spark API is consistent between Scala and Python though, so all differences are really only Scala itself.

Examples in Scala

// The basics
val sc = new SparkContext("local[*]", "TestContext")

val lines = sc.textFile("../ml-100k/u.data")
// Convert each line to string, split by tab, extract third field:
val ratings = lines.map(x => x.toString().split("\t")(2))
val results = ratings.countByValue()
val sortedResults = results.toSeq.sortBy(_._1)

sortedResults.foreach(println)
sortedResults.saveAsTextFile("resultFile")  // saved as multiple files
// Modify a column as you extract it:
people.select(people("name"), people("age") + 10).show()

// count how many times each movieID appears in a data frame:
movies.groupBy("movieID").count().orderBy(desc("count")).cache()

RDD to DataFrame and joining and aggregating data

var myRDD = sc.textFile("/user/cloudera/problem1/orders")

case class Order(order_id: Int, order_date: String, order_customer_id: Int, order_status: String)

var ordersDF = myRDD.map(l => l.split(",")). \
    map(x => Order(x(0).toInt, x(1), x(2).toInt, x(3))). \
    toDF()

var joinedDF = ordersDF.join(orderItemDF, ordersDF("order_id") === orderItemDF("order_item_order_id")

joinedDF.
    groupBy(col("order_date").alias("date"), col("order_status").alias("status")).
    agg(sum("order_item_subtotal").alias("total_amount"),
        countDistinct("order_id").alias("n")).
    orderBy(col("n").desc)

Random bits and pieces

// Ratings counter example

// Text file format: "userID movieID rating timestamp", tab delimited
lines.map(x => x.split("\t")(2).countByValue.toSeq.sortBy(_._2)
// .sortBy(x => x._2) is equivalent

// countByValue returns a Map, and toSeq converts it into a Seq.
// reduceByKey example
rdd.reduceByKey( (x,y) => x+y )
// rename DataFrame columns
df.select($"_1".alias("x1"))
// Compute average weight by age, given this data set:
// ID,name,age,weight
// 0,Bert,30,90
// 1,Curt,28,81
// 2,Derp,43,79
// 3,Emil,30,85

def parseLine(line: String) = {
    val fields = line.split(",")
    val age = fields(2).toInt
    val weight = fields(3).toInt
    (age, weight)
}
val lines = sc.textFile("file:///home/alex/people.csv")
val rdd = lines.map(parseLine)

// rdd:
// (30, 90)
// (28, 81)
// (43, 79)
// (30, 85)

val totalsByAge = rdd.mapValues(x => (x, 1)).
       reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

// totalsByAge:
// (30, (175, 2))
// (28, (81, 1))
// (43, (79, 1))

val averagesByAge = toalsByAge.mapValues(x => x._1 / x._2)

// so here, the tuple (30, (175, 2)) becomes (30, 87.5)

val results = averagesByAge.collect()  // this makes a Map object

results.sorted.foreach(println)  // sort and output
// Filtering data with function literals
// Only keep tuples where the second field equals "TMIN"
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
// DataFrame to CSV (RDD)

val myCSV = myDF.map(x => x(0) + "," + x(1) + "," + x(2))
myCSV.saveAsTextFile("/user/cloudera/mycsv")

Spark SQL

an example taken from here:

// Define the schema using a case class.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")