Spark Streaming allows you to process and analyze data streams in real time instead of in large batch jobs that would run every day. This data might include webserver access logs, or sensor data of “Internet of Things” devices.

Spark Streaming works like this: You have data streams coming into your cluster (e.g. from Kafka or Flume), and Spark Streaming discretizes this information into RDDs of data received over the last n seconds. You can then transform and process this data, and output it to other systems. It is important to note that Spark Streaming is not a replacement for tools like Flume, but is positioned after these data ingestion tools in the workflow chain.

The abstraction is called a DStream (discretized stream), and get supplied continuously over time as new information is received. You can work on them like any other RDD, and perform Map, Flatmap, Filter, or reduceByKey operations.

You can maintain long-lived states on a DStream, which allows you to compute statistics over many RDDs spread over a specific time frame. For example, you can keep running totals or running means over a specific key, or the top 10 used hashtags over the last hour. This would allow you to set a batch interval of 1 second, a slide interval of 15 minutes, and a window interval of 1 hour. This way, you can capture data very quickly into an RDD as it arrives (each second), but still run statistics on all data from the last hour. The slide interval denotes the frequency of how often these statistics are computed. Here, you create statistics of the last hour, but in 15 minute intervals.

In Python, windowed transformations are done by setting up a streaming context with a specified batch interval. Then, you can call methods like reduceByWindow() or reduceByKeyAndWindow() to analyze data in a windowed manner.

Spark Streaming with Flume

In this example, I’ll use Flume to capture data from a spooldir, and then write it to an Avro sink on a specific port on localhost (instead of a HDFS sink) that can be listened to with a Spark Streaming job. There, we aggregate data with a window function.

The difference to the first Flume example is that now we don’t direct data from the spooling directory into HDFS, but directly into Spark via Avro. Also, the source will be a spooling directory. The following config file describes the setup:

# sparkstreamingflume.conf: A single-node Flume configuration
 
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/maria_dev/spool
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = timestampInterceptor
a1.sources.r1.interceptors.timestampInterceptor.type = timestamp
 
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9092
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Now here is a Spark script in Python that listens to port 9092 and analyzes the stream:

import re
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
 
parts = [
    r'(?P<host>\S+)',                   # host %h
    r'\S+',                             # indent %l (unused)
    r'(?P<user>\S+)',                   # user %u
    r'\[(?P<time>.+)\]',                # time %t
    r'"(?P<request>.+)"',               # request "%r"
    r'(?P<status>[0-9]+)',              # status %>s
    r'(?P<size>\S+)',                   # size %b (careful, can be '-')
    r'"(?P<referer>.*)"',               # referer "%{Referer}i"
    r'"(?P<agent>.*)"',                 # user agent "%{User-agent}i"
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')
 
def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]
 
 
if __name__ == "__main__":
 
    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)
 
    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9092)
 
    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)
 
    # Reduce by URL over a 300-second window sliding every 1 second:
    urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)
 
    # Sort and print the results
    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()
 
    ssc.checkpoint("/home/maria_dev/checkpoint")
    ssc.start()
    ssc.awaitTermination()

In order, this script does the following: First, you define a regular expression that splits a log entry up into this constituent parts. Then, extractURLRequest is a map function that extracts the URL from the request field of an access log line.

Within the __main__ part, you then set up your Spark Streaming task itself. You there set up a Spark Context, and then a Streaming Context, based on the Spark Context, with a batch interval of 1 second. The FlumeUtils package then can connect to Flume through Avro, and listen to localhost on port 9092, in this case. The flumeStream object is a DStream object now. Each second, the map function then extracts the lines (and in the next step, the URLs) from the DStream.

The next step then is a mapper that converts an URL to the tuple (URL, 1), and a subsequent reduceByKeyAndWindow call that adds up all the 1’s. The parameters are, in order: The function to combine two keys, the function to “subtract” two keys, the window interval of 300 seconds, and the slide interval of 1 second. The “subtraction” function is optional, but if provided, it allows Spark to do some optimizations by combining large chunks of data and then removing some smaller parts again.

Lastly, you sort the results by popularity, and print them out each second.

The last three lines set a checkpoint directory in case the cluster goes down, then start the job, and wait for termination of the job.

This way of using Spark Streaming with Flume is called the “push” model. Here, you push data from Flume into Spark Streaming using Avro. To Flume, it looks like you’re pushing data from one Flume agent into another. Alternatively, you can use a “pull” model. Here, you establish a custom sink within Flume. This is more robust and recommended for real applications.

Start the job by issuing the following command from the shell:

mkdir ~/checkpoint
export SPARK_MAJOR_VERSION=2
spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 SparkFlume.py
# Scala version 2.11 and Spark version 2.0.0

Then, you need to start Flume to actually get data. On the Hortonworks sandbox, you would do this as follows:

cd /usr/hdp/current/flume-server
bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1

Structured Streaming

Spark 2.0 provides a higher-level API for streaming structured data. It’s still experimental as of now, though. Like Spark, this API uses DataSets (which are essentially DataFrames with more explicit type information) instead of DStreams consisting of RDDs directly. You then work with an unbounded table that gets new rows appended continuously.

This makes analyzing streaming data look very similar to analyzing non-streaming data, while also being faster than doing it the old way. Also, Spark tools like the machine learning library MLLib will be able to work with DataSets, which will then allow you to run complex algorithms on live streaming data.