Apache's Hadoop is an open source project that implements a Java-based, Map/Reduce parallel programming paradigm. It is designed to scale to very large clusters with thousands of nodes and terabytes of data. Not all parallel programming chores fit the Map/Reduce paradigm but many do, and many applications need to deal with very large amounts of data.
Hadoop is commonly used for cloud-based solutions but the platform is applicable to a wide range of applications, even embedded applications. Scalability is a significant advantage allowing a system to scale from a single node to tens of thousands. Hadoop requires JRE 1.6 or higher.
Table of Contents
- Hadoop Architecture
- Hadoop Program Model
- Hadoop Distributed File System (HDFS)
- Hadoop Configuration
- References
Hadoop Architecture
Hadoop's Map/Reduce paradigm (Fig. 1) assumes a problem where data can be split and given to independent, parallel threads that process a subset of the data. The results of these computations are then combined to form a result. It essentially splits the environment into two parts: computational and storage. Hadoop refers to the computational part.
The approach is designed to scale to very large cluster systems with thousands of computational and storage nodes. It is designed to work with the Hadoop Distributed File System (HDFS), but it can work with a range of file systems from cloud services like Amazon's S3 or conventional, distributed file systems.
Hadoop implements a master/slave architecture. A master node contains a JobTracker, TaskTracker, NameNode and DataNode service. It is possible to run these services on separate nodes. A slave or worker node contains a TaskTracker and DataNode. It is possible to have slave nodes that only run tasks or provide data support but this tends to be non-standard configuration.
A client application submits jobs to the JobTracker. JobTracker is a service that distributes MapReduce tasks to nodes within the Hadoop cluster. It uses a NameNode to determine where data is located. A heartbeat system is used by the JobTracker to make sure worker nodes are running properly and restarts tasks if a worker fails. The JobTracker can be a single point of failure.
A TaskTracker locally runs tasks distributed by the JobTracker. The node is configured with a set of slots. Each slot can handle a task. The JobTracker looks for empty slots when assigning tasks.
A NameNode provides access to HDFS. Normally it can be viewed as HDFS since most Hadoop environments use HDFS. Other file systems have to provide a NameNode compatible interface. The HDFS NameNode is another single point of failure as HDFS is not designed to be a high-availability system.
The DataNode provides storage for HDFS. It is normally paired with a TaskTracker. It is designed to provide data replication with data needed for local processing.
There are map and reduce tasks. These can be distributed by the JobTracker to TaskTracker nodes. A job is completed when all its tasks have terminated properly.
A Hadoop cluster may be running any number of jobs simultaneously.
Hadoop Program Model
The Hadoop Common package contains the JAR files, scripts, and documentation for running Hadoop. A running Hadoop cluster waits for jobs to be submitted by a client application.
Hadoop also supplies a Java API for the tasks that will be submitted to the cluster. Portions of the Java applications will be distributed through the cluster as a job is run.
The system operates on key/value pairs, also called tuples, normally written as
The operations use the same code for all data given to it but obviously the results vary based on the type of computation for each step. Typically the resulting data is smaller than the original but that is application dependent and is not a requirement. The result could even be an empty set. Often the Map/Reduce paradigm is used to process data or to search for information.
The distribution of data, ratios of the number of tasks for each operation and other factors can affect the performance of the system. The pair values are often large files such as images or other data that needs to be analyzed. Often the source data is relatively static and used for multiple jobs.
Tasks are tracked by the JobTracker and restarted if the node they are running on fails. This is a limited form of transaction processing but it works because the input data is essentially read-only. Likewise, data is appended via HDFS as an atomic transaction.
Hadoop Distributed File System (HDFS)
The Hadoop Distributed File System (HDFS) complements the Hadoop programming model. It is biased towards very large files. It does not support file or record locking. Files are essentially write once, and writing operations are limited to file appends. The append is an atomic operation so data from different tasks cannot be interleaved. This file system approach is designed to improve performance and make replication of data easier.
HDFS is designed to replicate very large files across DataNodes. The block size and replication factors are configurable on a per file basis. Normally HDFS does not use underlying RAID storage or network storage. Typically a solid state or hard disk drive is part of a worker node. HDFS' replication support provides a level of redundancy.
HDFS is accessible to Hadoop tasks, but it is also accessible from clients as well. This is how data gets in and out of the system.
HDFS has the advantage of working with with the JobTracker and TaskTracker services. This allows tasks to be scheduled near the data that is required. The availability of data locally impacts system performance, often significantly. Hadoop can work with other file systems. In this case, the HDFS APIs need to be supported.
HDFS supports dynamic addition and deletion of nodes as does Hadoop in general. Hadoop can also remove failing nodes from the mix.
HDFS is not restricted to operation with Hadoop. It is completely independent and it can be useful for other applications such as image processing or archiving.
Hadoop Configuration
Hadoop is designed to handle varying workloads but Hadoop and HDFS configuration can be a challenge. Different aspects of system design and configuration can significantly affect the performance of various applications.
By default, Hadoop uses a FIFO Scheduler with five priority levels. Other schedulers have been made available.
Facebook developed a Fair Scheduler that groups jobs into pools where each pool has a guaranteed minimum share of resources with excess capacity being split between jobs. Yahoo! developed the Capacity Scheduler. Jobs are submitted via queues with each queue being allocated a percentage of total resources. Excess resources are allocated to queues when their capacity is exhuasted.
In all cases, jobs and tasks run to completion before resources are relinquished. There is no preemption although jobs can be cancelled. This could be an issue for real time environments.
By the way, Doug Cutting, Hadoop's creator, named it after his son's toy elephant.
- Apache Community, "Apache Wiki", http://wiki.apache.org/general/
- Apache Community, "Apache Hadoop", http://wiki.apache.org/hadoop/
- Hadoop website, "Hadoop", http://hadoop.apache.org/