Namenode Directory Information Computer Science Essay

Published: Last Edited:

This essay has been submitted by a student. This is not an example of the work written by our professional essay writers.

The filesystem includes what is called a Secondary Namenode, which misleads some people into thinking that when the Primary Namenode goes offline, the Secondary Namenode takes over. In fact, the Secondary Namenode regularly connects with the Primary Namenode and builds snapshots of the Primary Namenode's directory information, which is then saved to local/remote directories. These checkpointed images can be used to restart a failed Primary Namenode without having to replay the entire journal of filesystem actions, the edit log to create an up-to-date directory structure.

 Jackal is an open source [2] fine grained distributed shared memory implementation of the Java programming language. Java inherently supports parallel programming with the use of multi-threading. Jackal exploits this property and allows users to run multi-threaded programs unmodified on a distributed memory environment such as a cluster.

Cascading is a thin Java library and API that sits on top of Hadoop's MapReduce layer and is executed from the command line like any other Hadoop application.

 my previous post, I talk about the methodology of transforming a sequential algorithm into parallel. After that, we can implement the parallel algorithm; one of the popular frameworks we can use is the Apache Opensource Hadoop Map/Reduce framework.

Functional Programming

Multithreading is one of the popular ways of doing parallel programming, but major complexity of multi-thread programming is to co-ordinate the access of each thread to the shared data. We need things like semaphores, locks, and also use them with great care, otherwise dead locks will result.

If we can eliminate the shared state completely, then the complexity of co-ordination will disappear. This is the fundamental concept of functional programming. Data is explicitly passed between functions as parameters or return values which can only be changed by the active function at that moment. Imagine functions are connected to each other via a directed acyclic graph. Since there is no hidden dependency (via shared state), functions in the DAG can run anywhere in parallel as long as one is not an ancestor of the other. In other words, analyse the parallelism is much easier when there is no hidden dependency from shared state.

Map/Reduce functions

Map/reduce is a special form of such a DAG which is applicable in a wide range of use cases. It is organized as a "map" function which transforms a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a "reduce" function is use to merge the values (of the same key) into a single result.

map(input_record) {


emit(k1, v1)


emit(k2, v2)



reduce (key, values) {

aggregate = initialize()

while (values.has_next) {

aggregate = merge(


collect(key, aggregate)


The Map/Reduce DAG is organized in this way.

A parallel algorithm is usually structure as multiple rounds of Map/Reduce

Distributed File Systems

The distributed file system is designed to handle large files (multi-GB) with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.

There is a master "NameNode" to keep track of overall file directory structure and the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.

To read a file, the client API will calculate the chunk index based on the offset of the file pointer and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From this points, the client contacts the DataNode directly without going through the NameNode.

To write a file, client API will first contact the NameNode who will designate one of the replica as the primary (by granting it a lease). The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNode. After changes are buffered at all DataNodes, the client send a "commit" request to the primary, which determines an order to update and then push this order to all other secondaries. After all secondaries complete the commit, the primary will response to the client about the success.

All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintain an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.

In case of the NameNode crash, all lease granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the clinet program has a handle to the DataNode. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log.

When a DataNode crashes, it will be detected by the NameNode after missing its hearbeat for a while. The NameNode removes the crashed DataNode from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.

Later when the DataNode recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advanced at each update. Therefore, the NameNode can easily figure out if any of the chunks of a DataNode becomes stale. Those stale chunks will be garbage collected at a later time.

Job Execution

Hadoop MapRed is based on a "pull" model where multiple "TaskTrackers" poll the "JobTracker" for tasks (either map task or reduce task).

The job execution starts when the client program uploading three files: "job.xml" (the job config including map, combine, reduce function and input/output data path, etc.), "job.split" (specifies how many splits and range based on dividing files into ~16 - 64 MB size), "job.jar" (the actual Mapper and Reducer implementation classes) to the HDFS location (specified by the "mapred.system.dir" property in the "hadoop-default.conf" file). Then the client program notifies the JobTracker about the Job submission. The JobTracker returns a Job id to the client program and starts allocating map tasks to the idle TaskTrackers when they poll for tasks.

Each TaskTracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol allows the JobTracker to know how many free slots from each TaskTracker. The JobTracker will determine appropriate jobs for the TaskTrackers based on how busy thay are, their network proximity to the data sources (preferring same node, then same rack, then same network switch). The assigned TaskTrackers will fork a MapTask (separate JVM process) to execute the map phase processing. The MapTask extracts the input data from the splits by using the "RecordReader" and "InputFormat" and it invokes the user provided "map" function which emits a number of key/value pair in the memory buffer. the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional "combine" function can be invoked to partially reduce values of each key. Afterwards, the "partition" function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files, the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.

When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.

The index + data file pair will then be splitted into are R local directories, one for each partition. After all the MapTask completes (all splits are done), the TaskTracker will notify the JobTracker which keeps track of the overall progress of job. JobTracker also provide a web interface for viewing the job status.

When the JobTracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling TaskTrackers (there are R TaskTrackers will be allocated for reduce task). These allocated TaskTrackers remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, JobTracker will notify these allocated TaskTrackers to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.

Eventually all the map tasks are completed. The JobTracker then notifies all the allocated TaskTrackers to proceed to the reduce phase. Each allocated TaskTracker will fork a ReduceTask (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the "reduce" function, which collects the key/aggregatedValue into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke the reduce(key, values) function in assending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that all entries written by the reduce() function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.

The Map/Reduce framework is resilient to crashes of any components. TaskTracker nodes periodically report their status to the JobTracker which keeps track of the overall job progress. If the JobTracker hasn't heard from any TaskTracker nodes for a long time, it assumes the TaskTracker node has been crashed and will reassign its tasks appropriately to other TaskTracker nodes. Since the map phase result is stored in the local disk, which will not be available when the TaskTracker node crashes. In case a map-phase TaskTracker node crashes, the crashed MapTasks (regardless of whether it is complete or not) will be reassigned to a different TaskTracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the TaskTracker node crashes. Therefore, in case a reduce-phase TaskTracker node crashes, only the incomplete ReduceTasks need to be reassigned to a different TaskTracker node, where the incompleted reduce tasks will be re-run.

The job submission process is asynchronous. Client program can poll for the job status at any time by supplying the job id.

posted by Ricky Ho @ 11:35 PM  9 comments links to this post


At December 22, 2008 2:42 PM , Blogger Sleepy Donkey said...

There is a little bit of a problem with "no shared state" in Map/Reduce family of algorithms: one should not forget thatJobTracker is a shared state and it is susceptible to failure. That's the part that gives me the most trouble - how to avoid this shared state or make whole schema less vulnerable? Otherwise the whole Map/Reduce schema just delays the inevitable - not solves it.

At April 1, 2009 2:08 AM , Blogger Vijay Saraswat said...

Regarding resiliency. What happens when a node that did a Map task T fails after it has reported success, but before ReduceTasks spawned on other nodes have read the files produced by T on its local disk?

At April 1, 2009 2:10 AM , Blogger Vijay Saraswat said...

Regarding resiliency. What happens when a node that did a Map task T fails after it has reported success, but before ReduceTasks spawned on other nodes have read the files produced by T on its local disk?

At April 1, 2009 7:11 AM , Anonymous V said...

> Regarding resiliency. What happens when a node that did a Map task T fails after it has reported success, but before ReduceTasks spawned on other nodes have read the files produced by T on its local disk?

The reduce task reports the JobTracker that it couldn't read the map output and the map eventually gets reexecuted on another node.

At April 2, 2009 4:59 AM , Blogger Vijay Saraswat said...

Thanks, that does make sense. 

But raises the next question -- who is responsible for deleting these temporary files?

At April 3, 2009 6:25 AM , Anonymous V said...

> But raises the next question -- who is responsible for deleting these temporary files?

Each task is composed of several attempts. A particular task goes to success if any of its attempts succeeds. In the current case, the particular attempt is failed and so the TaskTracker(responsible for running tasks) cleans up the temporary files. Subsequently a new attempt is launched for this task.

If you are really interested, please visit the hadoop web-site and join the user mailing lists there. I assure you it's a lot of fun and you will find challenging problems and corresponding solutions.. Enjoy..

At November 17, 2009 8:10 AM , Anonymous Anonymous said...

It was very interesting for me to read the blog. Thanks for it. I like such themes and anything connected to them. I definitely want to read a bit more soon.

At January 23, 2010 3:58 PM , Blogger Joe Biron said...

HBASE seems to no longer suffer from high latency random access:

The Anatomy of Hadoop I/O Pipeline

Thu August 27, 2009

by Owen OMalley (@owen_omalley)









In a typical Hadoop MapReduce job, input files are read from HDFS. Data are usually compressed to reduce the file sizes. After decompression, serialized bytes are transformed into Java objects before being passed to a user-defined map() function. Conversely, output records are serialized, compressed, and eventually pushed back to HDFS. This seemingly simple, two-way process is in fact much more complicated due to a few reasons:

Compression and decompression are typically done through native library code.

End-to-end CRC32 checksum is always verified or calculated during reading or writing.

Buffer management is complicated due to various interface restrictions.

In this blog post, I attempt to decompose and analyze the Hadoop I/O pipeline in detail, and explore possible optimizations. To keep the discussion concrete, I am going to use the ubiquitous example of reading and writing line records from/to gzip-compressed text files. I will not get into details on the DataNode side of the pipeline, and instead mainly focus on the client-side (the map/reduce task processes). Finally, all descriptions are based on Hadoop 0.21 trunk at the time of this writing, which means you may see things differently if you are using older or newer versions of Hadoop.

Reading Inputs

Figure 1 illustrates the I/O pipeline when reading line records from a gzipped text file using TextInputFormat. The figure is divided in two sides separated by a thin gap. The left side shows the DataNode process, and the right side the application process (namely, the Map task). From bottom to top, there are three zones where buffers are allocated or manipulated: kernel space, native code space, and JVM space. For the application process, from left to right, there are the software layers that a data block needs to traverse through. Boxes with different colors are buffers of various sorts. An arrow between two boxes represents a data transfer or buffer-copy. The weight of an arrow indicates the amount of data being transferred. The label in each box shows the rough location of the buffer (either the variable that references to the buffer, or the module where the buffer is allocated). If available, the size of a buffer is described in square brackets. If the buffer size is configurable, then both the configuration property and the default size are shown. I tag each data transfer with the numeric step where the transfer happens:

Figure 1: Reading line records from gzipped text files.

Data transferred from DataNode to MapTask process. DBlk is the file data block; CBlk is the file checksum block. File data are transferred to the client through Java nio transferTo (aka UNIX sendfile syscall). Checksum data are first fetched to DataNode JVM buffer, and then pushed to the client (details are not shown). Both file data and checksum data are bundled in an HDFS packet (typically 64KB) in the format of: {packet header | checksum bytes | data bytes}.

Data received from the socket are buffered in a BufferedInputStream, presumably for the purpose of reducing the number of syscalls to the kernel. This actually involves two buffer-copies: first, data are copied from kernel buffers into a temporary direct buffer in JDK code; second, data are copied from the temporary direct buffer to the byte[] buffer owned by the BufferedInputStream. The size of the byte[] in BufferedInputStream is controlled by configuration property "io.file.buffer.size", and is default to 4K. In our production environment, this parameter is customized to 128K.

Through the BufferedInputStream, the checksum bytes are saved into an internal ByteBuffer (whose size is roughly (PacketSize / 512 * 4) or 512B), and file bytes (compressed data) are deposited into the byte[] buffer supplied by the decompression layer. Since the checksum calculation requires a full 512 byte chunk while a user's request may not be aligned with a chunk boundary, a 512B byte[] buffer is used to align the input before copying partial chunks into user-supplied byte[] buffer. Also note that data are copied to the buffer in 512-byte pieces (as required by FSInputChecker API). Finally, all checksum bytes are copied to a 4-byte array for FSInputChecker to perform checksum verification. Overall, this step involves an extra buffer-copy.

The decompression layer uses a byte[] buffer to receive data from the DFSClient layer. The DecompressorStream copies the data from the byte[] buffer to a 64K direct buffer, calls the native library code to decompress the data and stores the uncompressed bytes in another 64K direct buffer. This step involves two buffer-copies.

LineReader maintains an internal buffer to absorb data from the downstream. From the buffer, line separators are discovered and line bytes are copied to form Text objects. This step requires two buffer-copies.

Optimizing Input Pipeline

Adding everything up, including a "copy" for decompressing bytes, the whole read pipeline involves seven buffer-copies to deliver a record to MapTask's map() function since data are received in the process's kernel buffer. There are a couple of things that could be improved in the above process:

Many buffer-copies are needed simply to convert between direct buffer and byte[] buffer.

Checksum calculation can be done in bulk instead of one chunk at a time.

Figure 2: Optimizing input pipeline.

Figure 2 shows the post-optimization view where the total number of buffer copies is reduced from seven to three:

An input packet is decomposed into the checksum part and the data part, which are scattered into two direct buffers: an internal one for checksum bytes, and the direct buffer owned by the decompression layer to hold compressed bytes. The FSInputChecker accesses both buffers directly.

The decompression layer deflates the uncompressed bytes to a direct buffer owned by the LineReader.

LineReader scans the bytes in the direct buffer, finds the line separators from the buffer, and constructs Text objects.

Writing Outputs

Now let's shift gears and look at the write-side of the story. Figure 3 illustrates the I/O pipeline when a ReduceTask writes line records into a gzipped text file using TextOutputFormat. Similar to Figure 1, each data transfer is tagged with the numeric step where the transfer occurs:

Figure 3: Writing line records into gzipped text files.

TextOutputFormat's RecordWriter is unbuffered. When a user emits a line record, the bytes of the Text object are copied straight into a 64KB direct buffer owned by the compression layer. For a very long line, it will be copied to this buffer 64KB at a time for multiple times.

Every time the compression layer receives a line (or part of a very long line), the native compression code is called, and compressed bytes are stored into another 64KB direct buffer. Data are then copied from that direct buffer to an internal byte[] buffer owned by the compression layer before pushing down to the DFSClient layer because the DFSClient layer only accepts byte[] buffer as input. The size of this buffer is again controlled by configuration property "io.file.buffer.size". This step involves two buffer-copies.

FSOutputSummer calculates the CRC32 checksum from the byte[] buffer from the compression layer, and deposits both data bytes and checksum bytes into a byte[] buffer in a Packet object. Again, checksum calculation must be done on whole 512B chunks, and an internal 512B byte[] buffer is used to hold partial chunks that may result from compressed data not aligned with chunk boundaries. Checksums are first calculated and stored in a 4B byte[] buffer before being copied to the packet. This step involves one buffer-copy.

When a packet is full, the packet is pushed to a queue whose length is limited to 80. The size of the packet is controlled by configuration property "dfs.write.packet.size" and is default to 64KB. This step involves no buffer-copy.

A DataStreamer thread waits on the queue and sends the packet to the socket whenever it receives one. The socket is wrapped with a BufferedOutputStream. But the byte[] buffer is very small (no more than 512B) and it is usually bypassed. The data, however, still needs to be copied to a temporary direct buffer owned by JDK code. This step requires two data copies.

Data are sent from the ReduceTask's kernel buffer to the DataNode's kernel buffer. Before the data are stored in Block files and checksum files, there are a few buffer-copies in DataNode side. Unlike the case of DFS read, both file data and checksum data will traverse out of kernel, and into JVM land. The details of this process are beyond the discussion here and are not shown in the figure.

Optimizing Output Pipeline

Overall, including the "copy" for compressing bytes, the process described above requires six buffer-copies for an output line record to reach ReduceTask's kernel buffer. What could we do to optimize the write pipeline?

We can probably reduce a few buffer-copies.

The native compression code may be called less frequently if we call it only after the input buffer is full (block compression codecs like LZO already do this).

Checksum calculations can be done in bulk instead of one chunk at a time.

Figure 4: Optimizing output pipeline.

Figure 4 shows how it looks like after these optimizations, where a total of four buffer-copies are necessary:

Bytes from a user's Text object are copied to a direct buffer owned by the TextOutputFormat layer.

Once this buffer is full, native compression code is called and compressed data is deposited to a direct buffer owned by the compression layer.

FSOutputSummer computes the checksum for bytes in the direct buffer from the compression layer and saves both data bytes and checksum bytes into a packet's direct buffer.

A full packet will be pushed into a queue, and, in background, the DataStreamer thread sends the packet through the socket, which copies the bytes to be copied to kernel buffers.


This blog post came out of an afternoon spent asking ourselves specific questions about Hadoop's I/O and validating the answers in the code. It turns out, after combing through class after class, that the pipeline is more complex than we originally thought. While each of us is familiar with one or more components, we found the preceding, comprehensive picture of Hadoop I/O elucidating, and we hope other developers and users will, too. Effecting the optimizations outlined above will be a daunting task, and this is the first step toward a more performant Hadoop.

-- Hong Tang