Monday, October 14, 2013

MapReduce programming in R using Hadoop Streaming

Hadoop supports non Java languages for writing MapReduce programs with the streaming feature. Any language which runs on Linux and can read/write from the STDIO can be used to write MR programs. With Microsoft HDInsight, it should be possible to even use .NET languages against Hadoop.

So, often I get the query `what is the best language for writing programs in MR model?`. It all depends on the requirements. If the requirements are about statistics then R is one of the choice, if it is NLP then Python is one of the choice. Based on the library support for the requirements, the appropriate language has to be choose.

In this blog we will install and configure R on Ubuntu to execute a simple MR program (word count) in R. This is with the assumption that Hadoop is already installed and works properly.

BTW, R can be integrated with Hadoop in multiple ways. One way is to use the R without any additional packages. In this case, R would be reading from STDIO, do some processing and then write to the STDIO. The second way is to use some packages like RHIPE, rmr2 and others. This blog is about integrating R with Hadoop without using any 3rd party libraries (the first approach).

Just as a note, I had been using Ubuntu 12.04, because it's latest LTS release from Canonical as of this writing and is supported till April, 2017. Canonical follows a 6 months release and every fourth release is LTS which are supported for 5 years, while the rest of the releases supported for 9 months. So, the next LTS release will be in April, 2014. This is one of the reason why I am stuck (and also feel good) with Ubuntu 12.04 for now.

The main advantage of sticking with an old release of Ubuntu (which is still supported) is that it becomes more and more stable over time and the main disadvantage is that the packages are outdated. Canonical provides major upgrades to different softwares only across different Ubuntu releases. So, Ubuntu 12.04 has an old version of R (2.14.1) which doesn't work with some of the Hadoop packages from Revolution Analytics and others, so the first step is to upgrade R and not use the one which comes with Ubuntu.

So, here are the steps (Note that the paths should be updated accordingly and also some of the commands would require administrative privileges)

- Pick a closest CRAN mirror from here and add the below lines to the `/etc/apt/sources.list` file to add a new repository. The same thing can be also done using the Synaptic Package Manager, the instructions for the same are here.
deb http://cran.ma.imperial.ac.uk/bin/linux/ubuntu precise/

deb-src http://cran.ma.imperial.ac.uk/bin/linux/ubuntu precise/
- Update the package index using the below command
sudo apt-get update
- Now is the time to install R using the apt-get as below. This command will install R with all it's dependencies.
sudo apt-get install r-base
- Verify that R is installed properly by running a simple program in the R shell.
> ints = 1:100
> doubleInts = sapply(ints, function(x) 2*x)
> head(doubleInts)

[1]  2  4  6  8 10 12

- Now create a mapper.R file using the below code
#! /usr/bin/env Rscript
# mapper.R - Wordcount program in R
# script for Mapper (R-Hadoop integration)

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

con <- file("stdin", open = "r")

while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)

    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

- Now create a reducer.R file using the below code
#! /usr/bin/env Rscript
# reducer.R - Wordcount program in R
# script for Reducer (R-Hadoop integration)

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)

splitLine <- function(line) {
    val <- unlist(strsplit(line, "\t"))
    list(word = val[1], count = as.integer(val[2]))
}

env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")

while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    split <- splitLine(line)
    word <- split$word
    count <- split$count

    if (exists(word, envir = env, inherits = FALSE)) {
        oldcount <- get(word, envir = env)
        assign(word, oldcount + count, envir = env)
    }
    else assign(word, count, envir = env)
}
close(con)

for (w in ls(env, all = TRUE))
    cat(w, "\t", get(w, envir = env), "\n", sep = "")

- Streaming code works independent of Hadoop, because the scripts read/write from STDIO and have no Hadoop bindings. So, the scripts can be tested independent of Hadoop as below.
vm4learning@vm4learning:~/Code/Streaming$ echo "foo foo quux labs foo bar quux" | Rscript mapper.R  | sort -k1,1 | Rscript reducer.R
bar    1
foo    3
labs    1
quux    2
- Create a simple input file and then upload the data into HDFS before running the MR job.
bin/hadoop fs -put /home/vm4learning/Desktop/input.txt input3/input.txt

- Run the MR job as below
bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -file  /home/vm4learning/Code/Streaming/mapper.R  -mapper /home/vm4learning/Code/Streaming/mapper.R -file /home/vm4learning/Code/Streaming/reducer.R  -reducer /home/vm4learning/Code/Streaming/reducer.R -input input3/ -output output3/

 - The output of the MR job written should be there in the output3/ folder in HDFS.

In the upcoming blogs, we will see how to use 3rd party libraries like the ones from Revolution Analytics to write much more simpler MR programs in R.

1) Here is a blog from HortonWorks on the same.

2 comments:

  1. Thank you so much Praveen, that was very useful.

    ReplyDelete
  2. Praveen,

    I am getting below error while running streaming job

    java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja
    java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja
    java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)
    Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja

    ReplyDelete