Wednesday, November 30, 2011

Passing parameters to Mappers and Reducers

There might be a requirement to pass additional parameters to the mapper and reducers, besides the the inputs which they process. Lets say we are interested in Matrix multiplication and there are multiple ways/algorithms of doing it. We could send an input parameter to the mapper and reducers, based on which the appropriate way/algorithm is picked. There are multiple ways of doing this

Setting the parameter:

1. Use the -D command line option to set the parameter while running the job.

2. Before launching the job using the old MR API

JobConf job = (JobConf) getConf();
job.set("test", "123");

3. Before launching the job using the new MR API

Configuration conf = new Configuration();
conf.set("test", "123");
Job job = new Job(conf);

Getting the parameter:

1. Using the old API in the Mapper and Reducer. The JobConfigurable#configure has to be implemented in the Mapper and Reducer class.

private static Long N;
public void configure(JobConf job) {
    N = Long.parseLong(job.get("test"));
}

The variable N can then be used with the map and reduce functions.

2. Using the new API in the Mapper and Reducer. The context is passed to the setup, map, reduce and cleanup functions.

Configuration conf = context.getConfiguration();
String param = conf.get("test");

Tuesday, November 29, 2011

0.23 Project Structure

Prior to 0.23 release the project structure was very simple (common, hdfs and mapreduce). With 0.23 a couple of projects have been added for the ResourceManager, NodeManager, ApplicationMaster etc. Here is the structure of the 0.23 release drawn using Freemind.

Thursday, November 24, 2011

HDFS Name Node High Availability

NameNode is the heart of HDFS. It stores the namespace for the filesystem and also tracks the location of the blocks in the the cluster. The location of the blocks are not persisted in the NameNode, but the DataNodes report the blocks it has to the NameNode when the DataNode starts. If an instance of NameNode is not available, then HDFS is not accessible till it's back running.

Hadoop 0.23 release introduced HDFS federation where it is possible to have multiple independent NameNodes in a cluster, where in a particular DataNode can have blocks for more than one Name Node. Federation provides horizontal scalability, better performance and isolation.

HDFS NN HA (NameNode High Availability) is an area where active work is happening. Here are the JIRA, Presentation and Video for the same. HDFS NN HA was not cut into 0.23 release and will be part of later releases. Changes are going in the HDFS-1623 branch, if someone is interested in the code.


Edit (11th March, 2012) : Detailed blog entry from Cloudera on HA.

Edit (14th October, 2013) : Explanation on HA from HortonWorks.

Browsing the MRv2 code in Eclipse

MRv2 is a revamp of the MapReduce engine for making Hadoop reliable, available, scalable and also for better cluster utilization. MRv2 development had been under active development for some time and alpha are being released now. The Cloudera article is very useful for getting the code from SVN, building, deploying and finally running a Job to make sure Hadoop has been setup properly. Here are some additional steps

- Protocol Buffers is used as an RPC protocol between different daemons. The recomendation is to use Protocol Buffer version 2.4.1+. Some of the Linux releases don't have this version. So, Protocol Buffers code has to be downloaded, built and installed using the `configure, make, make install` command. `make install` will require administrative privileges. `protoc --version` will give the version number.

- In Ubuntu 11.10, g++ was not installed by default. `sudo apt-get install g++` installed the required binaries.

- As mentioned in the article, git can be used to get the source code. But, git pulls the entire Hadoop repository. Code for a specific version can also be pulled using the command `svn co http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23/ source/`.

- Once the code has been successfully compiled using the `mvn clean install package -Pdist -Dtar -DskipTests` command, `mvn eclipse:eclipse` will build all the Eclipse related files and the projects can be imported as:



- Some of the projects may have errors, fix any missing jars and add source folders to the build path as required.


- Finally, add MR_REPO to the CLASSPATH variables in `Windows->Preferences`


- Now the projects should be compiled without any errors.


- Changes to the 0.23 branch are happening at a very fast pace. `svn up` will pull the latest code and `mvn clean install package -Pdist -Dtar -DskipTests` will compile the source code again. There is no need for a `mvn eclipse:eclipse` again.

Time to learn more about the Hadoop code :)

Tuesday, November 22, 2011

Interaction between the JobTracker, TaskTracker and the Scheduler

Scheduler in Hadoop is for sharing the cluster between different jobs, users for better utilization of the cluster resources. Also, without a scheduler a Hadoop job might consume all the resources in the cluster and other jobs have to wait for it to complete. With scheduler jobs can execute in parallel consuming a part of cluster.

Hadoop has a pluggable interface for schedulers. All implementations of the scheduler should extend the abstract class TaskScheduler and the scheduler class should be specified in the `mapreduce.jobtracker.taskscheduler` property (defaults to org.apache.hadoop.mapred.JobQueueTaskScheduler). The Capacity Scheduler, Fair Scheduler and other scheduler implementations are shipped with Hadoop.

The TaskTracker sends a heart beat (TaskTracker#transmitHeartBeat) to the JobTracker at regular intervals, in the heart beat it also indicates that it can take new tasks for execution. Then the JobTracker (JobTracker#heartbeat) consults the Scheduler (TaskScheduler#assignTasks) to assign tasks to the TaskTracker and sends the list of tasks as part of the HeartbeatResponse to the TaskTracker.

Sunday, November 20, 2011

Google Blogger Statistics SUCK !!!

In Google Blogger Statistics, only the top 10 referring sites are shown as below. As bots take over and with referral spam this feature is completely useless. Google should do a better job of handling the Statistics feature.

Checking the server log files for statistics of a self hosted blog is also a bit difficult because of the spam. It would be a good to have `reliable regularly updated publicly accessible list of spam sites`, which can be integrated with a script to get useful data out of the server logs.


Edit: Looks like Google Analytics is doing a good job of filtering the bots.

Thursday, November 17, 2011

Why to explicitly specify the Map/Reduce output parameters using JobConf?

Some thing really simple ....

Q) Why to explicitly specify the Map/Reduce output parameters using JobConf#setMapOutputKeyClass, JobConf#setMapOutputValueClass, JobConf#setOutputKeyClass and JobfConf#setOutputValueClass methods, can't Hadoop framework deduce them from the Mapper/Reducer implementations using Reflection?

A) This is due to Type Erasure :

When a generic type is instantiated, the compiler translates those types by a technique called type erasure — a process where the compiler removes all information related to type parameters and type arguments within a class or method. Type erasure enables Java applications that use generics to maintain binary compatibility with Java libraries and applications that were created before generics.

Nice to learn new things !!!

Wednesday, November 16, 2011

Hadoop release / version numbers


Edit: For easier access I have moved this to the pages section just below the blog header and no more maintaining this entry.

Software release numbers and features are daunting, remember Windows 1.0, 2.0, 2.1x, 3.0, 3.1, 95, 98, Me, NT WorkStation, 2000, XP, Vista, 7, 8 etc (I might have missed some of them). Microsoft seems to learning lately a bit with Windows 7, 8 naming. Ubuntu has a nice release scheme. The current release is 11.10 (the 1st number is the year and the 2nd number is the month of release), which says that it was released on October, 2011 and the next release number will be 12.04 (sometime around April, 2012). Ubuntu also has also clear guidelines on how long they would be supporting each version of Ubuntu.

Coming to Hadoop, there are multiple releases (0.20.*, 0.21, 0.22, 0.23 etc) and À la carte of features available in each of those release (CHANGES.txt in the release will have the JIRA's that have been pulled into that release) and users of Hadoop are confused on what release to pick. Some of these release are stable and some of them aren't. There is a lenghty discussion going in the Hadoop Groups to make the release numbers easy for everyone. Currently, 0.20.security, 0.22 and 0.23 are the releases on which work is happening actively. Proposal is to call them release 1, 2 and 3 for the coming releases, but it has yet to be finalized. 0.23 has been released recently, but is not production ready yet.

Besides the other improvements releated to HDFS, here is how the old/new MR API and Engine are supported in the different releases of Hadoop.


Old API New API Class MR Engine New MR Engine-MRv2
0.20.X Y Y Y N
0.22 Y Y Y N
0.23 Y(deprecated) Y N Y

Tuesday, November 15, 2011

IPC between Hadoop Daemons

Hadoop has multiple daemons namely NameNode, DateNode, CheckPointNode, BackUpNode, JobTracker, TaskTracker and finally the client which submits the job. Interaction between the daemons is a bit complex and not well documented.

Hadoop has it's own RPC mechanism for IPC. The arguments and the return type are serialized using Writable. Protocols for RPC extend the o.a.h.ipc.VersionedProtocol. So, to get the interaction between the Hadoop daemons, references for the VersionedProtocol will be useful.

First get the Hadoop code locally using SVN (pick the appropriate branch)

svn co http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21/

Once the code has been got locally, the following command will give all the protocol definitions in Hadoop

grep -r "extends VersionedProtocol" * | cut -f1 -d':'

common/src/java/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java
common/src/java/org/apache/hadoop/security/RefreshUserToGroupMappingsProtocol.java
common/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
common/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java
common/src/test/core/org/apache/hadoop/ipc/TestRPC.java
common/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java
common/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
mapreduce/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
mapreduce/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java

Some of the interesting interfaces are NamenodeProtocol, InterDatanodeProtocol, DatanodeProtocol, ClientProtocol, ClientDatanodeProtocol, InterTrackerProtocol, AdminOperationsProtocol and TaskUmbilicalProtocol. These can be explored for further insights into Hadoop.

Edit: Converted the above list of classes into URL's for the eager and the impatient.

Sunday, November 13, 2011

Seamless access to the Java SE API documentation

API documentation for Java SE and Hadoop (and other frameworks) can be downloaded for offline access. But, the Hadoop API documentation is not aware of the offline copy of Java SE documentation.

For seamless interaction between the two API, reference to

http://java.sun.com/....../ConcurrentLinkedQueue.html

in the Hadoop API should be replaced with

file:///home/praveensripati/....../ConcurrentLinkedQueue.html

The below command will replace all such references in the API documentation (note that back slash has to be escaped)

find  ./ -type f -name *.html -exec sed -i 's/http:\/\/java.sun.com\/javase\/6\/docs\//file:\/\/\/home\/praveensripati\/Documents\/Java6API\//' {} \;

This enables for seamless offline access and better productivity.

Saturday, November 12, 2011

PDF Reader with Annoations - Okular

For those who store much information digitally and less on paper, Okular is a nice software to create annotations in PDF documents. Okular runs on multiple platforms, but I had been using it on Ubuntu for some time and quite happy with it.

'sudo apt-get install okular' will install Okular on Ubuntu. The annotations are stored in the $HOME/.kde/share/apps/okular/docdata folder as a separate file for each pdf document. By creating a symlink for this folder in DropBox, the annotations are automatically backed up.

This is no way related to  Hadoop, but thought of blogging it. BTW, there a quite a number of free and very useful software like GIMP for Linux.



Thursday, November 10, 2011

Retrieving Hadoop Counters in Map/Reduce Tasks

Hadoop uses Counters to gather metrics/statistics which can later be analyzed for performance tuning or to find bugs in the MapReduce programs. There are some predefined Counters and Custom counters can also be defined. JobCounter and TaskCounter contain the predefined Counters in Hadoop. There are lot of tutorials on incrementing the Counters from the Map and Reduce tasks. But, how to fetch the current value of the Counter from with the Map and Reduce tasks.

Counters can be incremented using the Reporter for the Old MapReduce API or by using the Context using the New MapReduce API. These counters are sent to the TaskTracker and the TaskTracker will send to the JobTracker and the JobTracker will consolidate the Counters to produce a holistic view for the complete Job. The consolidated Counters are not relayed back to the Map and the Reduce tasks by the JobTracker. So, the Map and Reduce tasks have to contact the JobTracker to get the current value of the Counter.

StackOverflow Query has the details on how to get the current value of a Counter from within a Map and Reduce task.

Edit: Looks like it's not a good practice to  retrieve the counters in the map and reduce tasks. Here is an alternate approach for passing the summary details from the mapper to the reducer. This approach requires some effort to code, but is doable. It would have been nice if the feature had been part of Hadoop and not required to hand code it.

Friday, November 4, 2011

Hadoop MapReduce challenges in the Enterprise

Platform Computing published a five part series (one, two, three, four, five) about the Hadoop MapReduce Challenges in the Enterprise. Some of the challenges mentioned in the Series are addressed by the NextGen MapReduce which will be available soon for download, but some of the allegations are not proper. Platform has got products around MapReduce and is about to be acquired by IBM, so not sure how they got them wrong.

Platform) On the performance measure, to be most useful in a robust enterprise environment a MapReduce job should take  sub-millisecond to start,  but the job startup time in the current open source MapReduce implementation is measured in seconds.

Praveen) MapReduce is supposed to be for batch processing and not for online transactions. The data from a MapReduce Job can be fed to a system for online processing. It's not to say that there is no scope for improvement in the MapReduce job performance.

Platform) The current Hadoop MapReduce implementation does not provide such capabilities. As a result, for each MapReduce job, a customer has to assign a dedicated cluster to run that particular application, one at a time.

Platform) Each cluster is dedicated to a single MapReduce application so if a user has multiple applications, s/he has to run them in serial on that same resource or buy another cluster for the additional application.

Praveen) Apache Hadoop had a pluggable Scheduler architecture and has Capacity, Fair and FIFO Scheduler. The FIFO Scheduler is the default one. Schedulers allow multiple applications and multiple users to share the cluster at the same time.

Platform) Current Hadoop MapReduce implementations derived from open source are not equipped to address the dynamic resource allocation required by various applications.

Platform) Customers also need support for workloads that may have different characteristics or  are  written in different programming languages. For instance, some of those applications could be data intensive such as MapReduce applications written in Java, some could be CPU intensive such as Monte Carlo simulations which are often written in C++ -- a runtime engine must be designed to support both simultaneously.

Praveen) NextGen MapReduce allows for dynamic allocation of resources. Currently there is only support for RAM based requests, but the framework can be extended for other parameters like CPU, HDD etc in the future.

Platform) As mentioned in part 2 of this blog series, the single job tracker in the current Hadoop implementation is not separated from the resource manager, so as a result, the job tracker does not provide sufficient resource management functionalities to allow dynamic lending and borrowing of available IT resources.

Praveen) NextGen MapReduce separates resource management and task scheduling into separate components.

To summarize, NextGen MapReduce addresses some of the concerns raised by Platform, but it will take some time for NextGen MapReduce to get stabilized and be production ready.

Wednesday, November 2, 2011

Hadoop Jar Hell

It's just not possible to download the latest Hadoop related Projects from Apache and use them together because of the interoperability issues among the different Hadoop Projects and their release cycles.

That's the reason why BigTop an Apache Incubator project has evolved, to solve the interoperability issues around the different Hadoop projects by providing a test suite. Also, companies like Cloudera provide their own distribution with different Hadoop projects based on Apache distribution, with proper testing and support.

Now HortonWorks which has been spun from Yahoo joined the same ranks. Their initial manifesto was to make the Apache downloads a source where anyone can download the jars and use them without any issues. But, they have moved away from this with the recent announcement of the HortonWorks Data Platform which is again based on Apache distribution similar to what Cloudera has done with their CDH distributions. Although, HortonWorks and Cloudera have their own distribution, they would be actively contributing the Apache Hadoop ecosystem.

With the maturity of BigTop it would be possible to download different Hadoop related jar files from Apache and use them directly instead of depending on the distributions from HortonWorks and Cloudera.

As mentioned in the GigaOm Article, such distributions from HortonWorks and Cloudera make them easy to support their customers as they have to support limited number of Hadoop versions and they would also know the potential issues with those versions.