Friday, December 30, 2011

WhatsWrong : Accumulating values sent to mapper

The below program accumulates the values sent to the mapper in valueBuff of type ArrayList<Text> (at line 21) and prints the ArrayList in the AccumulateKVMapper#cleanup method (at line 28).

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AccumulateKV {
    static class AccumulateKVMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {

        public ArrayList<Text> valueBuff = new ArrayList<Text>();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            valueBuff.add(value);
            context.write(key, value);
        }

        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            for (Text o : valueBuff) {
                System.out.println("value = " + o.toString());
            }
        }

    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err
                    .println("Usage: AccumulateKV <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(AccumulateKV.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(AccumulateKVMapper.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The input to the job is

Berlin-Germany
New Delhi-India
Kuala Lampur-Malaysia
Pretoria-Sri Lanka

The expected output is

value = Berlin-Germany
value = New Delhi-India
value = Kuala Lampur-Malaysia 
value = Pretoria-Sri Lanka

But, the output of the program is

value =
value =
value = 
value =

What's wrong with the above program? I have tried it against 0.22 release in Local (Standalone) Mode, but it should behave the same with other releases and also in Pseudo-Distributed and Fully-Distributed Mode.

Respond back in the comments and I will give a detailed explanation once I get a proper response.

Security in Hadoop

Hadoop security is often neglected and the information on the internet for securing a Hadoop cluster is also sparse.

It took me sometime to get around Hadoop security.  So, included a separate page `Hadoop Security` with a quick summary and resources about Hadoop security at the top of the blog.

Will update `Hadoop Security` page as I get more comfortable and as more information is available on Hadoop security.

Sunday, December 25, 2011

Limiting the usage Counters in Hadoop

Besides the JobCounter and the TaskCounter counters which Hadoop framework maintains, it's also possible to define custom counters for application level statistics.

Counters can be incremented using the Reporter for the Old MapReduce API or by using the Context using the New MapReduce API. These counters are sent to the TaskTracker and the TaskTracker will send to the JobTracker and the JobTracker will consolidate the Counters to produce a holistic view for the complete Job.

There is a chance that a rogue Job creates millions of counters and since these counters are stored in the JobTracker, there is a better chance that JobTracker will go OOM. To avoid such a scenario the number of counters that can be created per Job are limited by the Hadoop framework.

The following code is in the Counters.java. Note that this code is in the 20.203, 20.204 and 20.205 (now called 1.0) releases. Also note that some of the parameters are configurable and some are not.

/** limit on the size of the name of the group **/
private static final int GROUP_NAME_LIMIT = 128;
/** limit on the size of the counter name **/
private static final int COUNTER_NAME_LIMIT = 64;

private static final JobConf conf = new JobConf();
/** limit on counters **/
public static int MAX_COUNTER_LIMIT = 
conf.getInt("mapreduce.job.counters.limit", 120);

/** the max groups allowed **/
static final int MAX_GROUP_LIMIT = 50;

In trunk and 0.23 release the below code is there in the MRJobConfig.java. Note that the parameters are configurable.

public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
public static final int COUNTERS_MAX_DEFAULT = 120;

public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;

public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
public static final int COUNTER_NAME_MAX_DEFAULT = 64;

public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;

The above mentioned configuration parameters are not mentioned in the release documentation an so thought it would be worth mentioning as a blog entry.

I would like to call it a hidden jewel :)

Getting started with Hadoop in easy steps

For those who are interested in Hadoop, but are stuck with a Windows machine or have inertia to install Linux for different reasons, here are some alternatives.

a) Install Hadoop on Windows using Cygwin.

b) Wait for Microsoft to come out of the Hadoop preview.

c) Use Amazon EMR. Amazon has very clear documentation.

* d) Use a virtual image CDH from Cloudera which includes Hadoop.

We would be going through the last option of using CDH image from Cloudera in detail. CDH documentation can be found here.

The prerequisite for this is the installation of VirtualBox on the target machine. VirtualBox can be downloaded from here and here are the detailed instructions for installing VirtualBox. Also, an virtual image of CDH for VirtualBox has to be downloaded and unzipped. The downloaded file name would be similar to cloudera-demo-vm-cdh3u2-virtualbox.tar.gz.

Lets get started.

Step 1) Start VirtualBox and click on New (Ctrl-N).


Step 2) Click on Next.


Step 3) Select the options as below. Make sure to select `Red Hat (64 bit)` as CentOS is based on Red Hat and that the CDH image is 64 bit. Click on Next.


Step 4) Cloudera documentation says `This is a 64-bit image and requires a minimum of 1GB of RAM (any less and some services will not start). Allocating 2GB RAM to the VM is recommended in order to run some of the demos and tutorials`. Was able to run the Hadoop example with 1536 MB. Note that the memory allocated to the Guest OS can be changed later from within VirtualBox.


Step 6) Select `Use existing hard disk` and choose the CDH image unzipped earlier and click on Next.


Step 7) Make sure all the details are correct in the summary screen and click on Create.


Step 8) The VirtualBox home screen should look like below. Select CDH on the left pane and click on Start.


Step 9) After a couple of moments, the virtual image would have started along with all the daemons.


Step 10) To verify that the Hadoop daemons started properly, check that the number of TaskTracker nodes at http://localhost:50030/jobtracker.jsp and the number of DataNodes at http://localhost:50070/dfshealth.jsp is 1.

Also, the output of the `sudo /usr/java/default/bin/jps` command should be as below

2914 FlumeMaster
3099 Sqoop
2780 FlumeWatchdog
2239 JobTracker
2850 FlumeWatchdog
2919 FlumeNode
2468 SecondaryNameNode
2019 HMaster
3778 Jps
2145 DataNode
2360 NameNode
2964 RunJar
3076 Bootstrap
2568 TaskTracker

Step 11) Open a terminal and run the below command

hadoop --config $HOME/hadoop-conf jar /usr/lib/hadoop/hadoop-0.20.2-cdh3u2-examples.jar pi 10 10000

On the successful completion of the Job the output should look as below.


Note:

1) Some of the processors support Intel-VT and AMD-V for Hardware virtualization support. Most of the times this is disabled in the BIOS and has to be explicitly turned on.

2) Flume, Sqoop, Hive and Hue are also started besides the core Hadoop. These can be disabled by removing the x permissions for the files in /etc/init.d.

3) For  purpose of stability/security the patches on CentOS can be updated using `sudo yum update` command.

4) For better performance and usability if the Guest (CentOS) VirtualBox Guest Additions have to be installed.

5) Hue (Web UI for Hadoop) can be accessed from http://localhost:8088/.

Friday, December 23, 2011

What is the relation between Hibernate and Swap Memory?

With the new HP 430 Notebook, hibernate was not working. Was getting a message that not enough swap for hibernate. Found from this Wiki that (swap memory >= RAM) for hibernate to work.

Since the HP 430 Netbook had enough RAM (4GB), I choose the swap to be 1GB at the time of Ubuntu installation and so hibernate was not working. Again the Wiki has instructions for increasing the size of the Swap.

So, it's better to choose enough swap at the time of the Ubuntu installation for hibernate to work.

New domain name for this blog

I own the www.thecloudavenue.com domain and had been using it for some other blog, which I had not been updating very actively. So, I am changed the domain name for this blog to use www.thecloudavenue.com.

Happy Hadoop'ing :)

Thursday, December 22, 2011

My new HP 430 Notebook

This week I bought a new HP 430 Notebook from Flipkart and am extremely satisfied with it. The Flipkart service was really awesome, would recommend to try it.


The Netbook was naked (without any OS), so I saved a couple of bucks on some proprietary OS. As soon as I got it, I installed Ubuntu 11.10 with all the updates. Since, I use apt-cacher-ng and maintain the list of software I use, setting up the machine was a breeze. A single `sudo apt-get install ........` installed all the required software.

Also, installed Java, Hadoop, Eclipse and the required software to try, learn and explore Hadoop. The Netbook has an i5 processor and supports Intel-VT, which had to be enabled in the BIOS. Also, installed Ubuntu 11.10 as guest using VirtualBox and tried a Hadoop cluster of 2 nodes (host and guest) with 0.22 release.

The regret I have is that I am able to suspend the Notebook in Ubuntu 11.10, but not able to resume it back. So, I have to hibernate it or shut it down. And though the Netbook has 4GB RAM, the graphics card doesn't have separate memory and is consuming it from the 4GB RAM.

Edit (24th January, 2012) - Noticed that the Notebook had a few bright spots and had to get it replaced. Although Flipkart had a 30 day replacement guarantee free of cost, the customer support got me in touch with the HP service center to get the Netbook monitor replaced instead of a full Notebook replacement, which was not what I  wanted. Had to literally call Flipkart 15-20 times, send pictures of the Notebook monitor multiple times and finally had to escalate it to their supervisor before they agreed for a replacement of the Notebook.

Flipkart which is being called Amazon for India has a customer support which was behaving a lot cranky for the replacement of the laptop in spite of the damage. Flipkart is offereing competetive price with a COD (Cash On Delivery) and a 3 month EMI, so would be definitely recomendig to use the Flipkart service with care.

Edit (1st August, 2012) - Noticed that sometimes (very rare) the caps lock doesn't work and the network also doesn't work. Had to remove the battery from the laptop and put it back. Reboot doesn't work in both the cases. Looks like some state is stored in the laptop even when turned off, which gets reset when the battery is removed and put back.

Friday, December 16, 2011

What should be the input/ouput key/value types be for the combiner?

When only a map and reducer class are defined for a job, the key/value pairs emitted by the mapper are consumed by the by the reducer. So, the output types for the mapper should be the same as the reducer.

(input) <k1, v1> -> map -> <k2, v2> -> reduce -> <k3, v3> (output)

When a combiner class is defined for a job, the intermediate key value pairs are combined on the same node as the map task before sending to the reducer. Combiner reduces the network traffic between the mappers and the reducers.

Note that the combiner functionality is same as the reducer (to combine keys), but the combiner input/output key/value types should be of the same type, while for the reducer this is not a requirement.

(input) <k1, v1> -> map -> <k2, v2> -> combine* -> <k2, v2> -> reduce -> <k3, v3> (output)

In the scenario where the reducer class is also defined as a combiner class, the combiner/reducer input/ouput key/value types should be of the same type (k2/v2) as below. If not, due to type erasure the program compiles properly but gives a run time error.

(input) <k1, v1> -> map -> <k2, v2> -> combine* -> <k2, v2> -> reduce -> <k2, v2> (output)

Thursday, December 15, 2011

What is the difference between the old and the new MR API?

With the release of Hadoop 0.20, new MR API has been introduced (o.a.h.mapreduce package). There is not much of significant differences between the old MR API (o.a.h.mapred) and the new MR API (o.a.h.mapred) except that the new MR API allows to pull data from within the Map and Reduce tasks by calling the nextKeyValue() on the Context object passed to the Map function.

Also, some of the InputFormats have not been ported to the new MR API. So, to use the missing InputFormat either stop using the new MR API and go back to the old MR API or else extend the InputFormat as required.


How to start CheckPoint NameNode in Hadoop 0.23 release?

Prior to 0.23 release the masters file in the  conf folder of Hadoop installation had the list of host names on which the CheckPoint NN has to be started. But, with the 0.23 release the masters file is not used anymore, the dfs.namenode.secondary.http-address key has to be set to ip:port in hdfs-site.xml. CheckPoint NN can be started using the sbin/hadoop-daemon.sh start secondarynamenode command. Run jps command to make sure that the CheckPoint NN is running and also check the corresponding log file also for any errors.

BTW, Secondary NN is being referred to as CheckPoint NN. But, the code is still using Secondary NN and people still refer it as Secondary NN.

Tuesday, December 13, 2011

Hadoop and JavaScript

Microsoft has announced limited preview version of Hadoop on Azure. JavaScript can also be used to write MapReduce jobs on Hadoop. As of now, Streaming allows any scripting language which can read/write from STDIN/STDOUT to be used with Hadoop. But, what Microsoft is trying make is make JavaScript a first class citizen for Hadoop. There is a session on `Hadoop + JavaScript: what we learned' end of February, which is too long for the impatient. BTW, here is an interesting article on using JavaScript on Hadoop with Rhino.

There had been a lot of work on JavaScript in the browser area for the last few years to improve the performance (especially V8).

Can anyone share use-cases or their experience using JavaScript for HPC (High Performance Computing) in the comments and I will update the blog entry accordingly?

Thursday, December 8, 2011

BigData vs Databases

Anant explains in an easy to understand manner. Here is his comparison of the Big Data and Databases. Suggest to subscribe his blog.

Monday, December 5, 2011

HDFS explained as comics

Manish has done a nice job explaining HDFS as comics for those who are new to HDFS. Here is the link.