HDFS

The HDFS allows very large files to be broken up into blocks and distributed across a cluster of computers. It also makes sure that computers process these blocks that are physically close to the computers.

It intelligently stores more than one copy of each block, so that you don’t lose any information if a single node goes down. This allows you to get commodity computers instead of high-reliability systems, because you don’t care too much about one node failing.

HDFS consists of one Name Node that knows what’s on all individual Data Nodes. Name node resilience is a problem, but there are solutions. These are more of a concern for Hadoop administrators, not the application developers/analysts, though.

Using HDFS

The two main ways to use HDFS are:

  • Through a UI such as Ambari. This makes it look like a giant hard drive. It’s very intuitive and you can manipulate files like through a file explorer.
  • Command-line interface. You can SSH to your master node and execute commands such as hadoop fs -ls. All HDFS command start with hadoop fs . Other alternatives include:

  • HTTP/HDFS proxies
  • Java interface. Since Hadoop is written in Java, there are Java APIs available for almost anything.
  • NFS gateway: You can mount a HDFS cluster!

The sample data I will be working with is the MovieLens 100k data set. It contains 100’000 ratings for movies, in two tables: u.data has the columns userID, movieID, rating, and timestamp. The first few lines look like this:

[maria_dev@sandbox ml-100k]$ head u.data
196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013

A second table, u.item, has the titles (and more information) for the movieIDs. It looks like this:

[maria_dev@sandbox ml-100k]$ head u.item
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
6|Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|01-Jan-1995||http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|0|0
7|Twelve Monkeys (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|1|0|0|0
8|Babe (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Babe%20(1995)|0|0|0|0|1|1|0|0|1|0|0|0|0|0|0|0|0|0|0
9|Dead Man Walking (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Dead%20Man%20Walking%20(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|0|0
10|Richard III (1995)|22-Jan-1996||http://us.imdb.com/M/title-exact?Richard%20III%20(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|1|0

MapReduce

It’s the failure-resilient default builtin for processing data on your cluster. Essentially, you program a mapper function that processes your data, and a reducer function that aggregates the partial results into one final answer.

The mapper will transform each row into a key/value pair. The key is what’s getting aggregated on (e.g. a user id), and the value is something you do aggregate (e.g. a rating). The mapper then automatically performs a shuffle and sort step, where it aggregates (and sorts) all the values per key. You end up with a unique user id, and a list of ratings per user. Finally, the reducer combines them into your result (e.g. the average rating of each user id).

Sometimes, it’s hard to transform a question into a MapReduce job. In these cases, other frameworks like Spark or Hive are appropriate, which enable you to ask SQL-style queries.

Under the hood, you kick off a MapReduce job from some client node on your cluster. It then talks to the YARN resource manager. He knows which machines are up and have capacity. The client might need to copy some data to HDFS. Then, YARN spawns a MapReduce application master. He works with YARN to keep track of all Map tasks.

Handling Failure

The application manager monitors worker tasks for errors or hanging. If a worker task explodes, the application manager can restart it (preferably on a different node).

If the application master goes down, YARN can try to restart it.

If an entire node goes down, the resource manager will try to restart it.

The one scary thing that could happen is that the resource manager goes down. You could set up “high availability MapReduce” with Zookeeper. It will maintain a hot standby resource manager that jumps in if the first one fails.

A first MapReduce script in Python

This is a Python script that computes a frequency table of ratings for the ml100k dataset. It consists of a class inheriting from MRJob, and a mapper and reducer function.

from mrjob.job import MRJob
from mrjob.step import MRStep
 
class RatingsBreakdown(MRJob):
    def steps(self):
        return [ 
            MRStep(mapper=self.mapper_get_ratings,
                   reducer=self.reducer_count_ratings)
        ]
    def mapper_get_ratings(self, _, line):
        (userID, movieID, rating, timestamp) = line.split('\t')
        yield rating, 1
    def reducer_count_ratings(self, key, values):
        yield key, sum(values)
 
if __name__ == '__main__':
    RatingsBreakdown.run()

More commonly, problems are not as easy as this one, and creating a plain MapReduce script is somewhat of a struggle. This is why nowadays people commonly use higher level systems instead (Hive or Spark, for example).