Tuesday, October 22, 2013

Integrating R and Hadoop using rmr2/rhdfs packages from Revolution Analytics

As mentioned in the earlier post Hadoop MR programs can be developed in non Java languages using the Hadoop streaming.

R is a language which has lot of libraries for statistics, so data processing involving a lot of statistics can be easily done in R. R can be integrated with Hadoop by writing R programs which read from the STDIO, do some processing and then writing to the results to the STDIO. An easier approach is to use some of the libraries like RHIPE and rmr2/rhdfs from Revolution Analytics.

With the assumption that R has been installed and integrated with Hadoop as mentioned in the previous blog, this blog will look into how to integrate R with Hadoop using 3rd party packages like rmr2/rhdfs from Revolution Analytics. As mentioned here it is much more easier to write R programs using rmr2/rhdfs than without.

Here are the instructions on how to install rmr2/rhdfs and then run write/run MR programs using those packages.

- Install the dependencies for rmr2/rhdfs. Note that the dependencies have to be installed in the system folder and not in a custom folder. This can be done by running R as sudo and then installing the below mentioned packages. Check this query on StackOverflow on why.
install.packages(c("codetools", "R", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava"))
- Download the latest rmr2*tar.gz and rhdfs*tar.gz from here.

- Install rmr2.
sudo R CMD INSTALL rmr rmr2_2.3.0.tar.gz
- Install rhdfs.
sudo JAVA_HOME=/usr/lib/jvm/jdk1.6.0_45 R CMD javareconf
sudo HADOOP_CMD=/home/vm4learning/Installations/hadoop-1.2.1/bin/hadoop R CMD INSTALL rhdfs rhbase_1.2.0.tar.gz
- Execute the below commands from the R shell to kick off the MR 
Sys.setenv(HADOOP_HOME="/home/vm4learning/Installations/hadoop-1.2.1")
Sys.setenv(HADOOP_CMD="/home/vm4learning/Installations/hadoop-1.2.1/bin/hadoop")

library(rmr2)
library(rhdfs)

ints = to.dfs(1:100)
calc = mapreduce(input = ints, map = function(k, v) cbind(v, 2*v))

from.dfs(calc)
- Verify the output of the MR job to make sure that Hadoop and rmr2/rhdfs have been integrated properly.
$val
         v    
  [1,]   1   2
  [2,]   2   4
  ............
  ............

In the upcoming blogs, we will see how to write complex MR programs using rmr2/rhdfs.

5 comments:

  1. Hi there, nice tutorial!
    When you paste the code for install rhdfs, you did the jar rhbase_1.2.0.tar.gz I think you mean rhdfs*tar.gz. I used this jar and everything works great.

    Cheers!

    ReplyDelete
  2. Hi, Thanks for the nice article. I got the following error at the end of the execution. Can you please help us:

    14/03/25 10:52:37 INFO streaming.StreamJob: Running job: job_1395719087611_0009
    14/03/25 10:52:37 INFO streaming.StreamJob: Job running in-process (local Hadoop)
    14/03/25 10:52:38 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:54:04 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:54:06 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:54:47 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:54:49 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:55:35 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:55:36 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:56:18 INFO streaming.StreamJob: map 50% reduce 0%
    14/03/25 10:56:20 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:56:25 INFO streaming.StreamJob: Job running in-process (local Hadoop)
    14/03/25 10:56:25 ERROR streaming.StreamJob: Job not successful. Error: Task failed task_1395719087611_0009_m_000000
    Job failed as tasks failed. failedMaps:1 failedReduces:0

    14/03/25 10:56:25 INFO streaming.StreamJob: killJob...
    14/03/25 10:56:25 INFO impl.YarnClientImpl: Killing application application_1395719087611_0009
    Streaming Command Failed!
    Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, :
    hadoop streaming failed with error code 1
    14/03/25 10:57:09 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
    Deleted /tmp/file81d5d54392b

    ReplyDelete
  3. I got the following error at the end of the execution. Can you please help us:

    14/03/25 10:52:37 INFO streaming.StreamJob: Running job: job_1395719087611_0009
    14/03/25 10:52:37 INFO streaming.StreamJob: Job running in-process (local Hadoop)
    14/03/25 10:52:38 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:54:04 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:54:06 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:54:47 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:54:49 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:55:35 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:55:36 INFO streaming.StreamJob: map 0% reduce 0%
    14/03/25 10:56:18 INFO streaming.StreamJob: map 50% reduce 0%
    14/03/25 10:56:20 INFO streaming.StreamJob: map 100% reduce 0%
    14/03/25 10:56:25 INFO streaming.StreamJob: Job running in-process (local Hadoop)
    14/03/25 10:56:25 ERROR streaming.StreamJob: Job not successful. Error: Task failed task_1395719087611_0009_m_000000
    Job failed as tasks failed. failedMaps:1 failedReduces:0

    14/03/25 10:56:25 INFO streaming.StreamJob: killJob...
    14/03/25 10:56:25 INFO impl.YarnClientImpl: Killing application application_1395719087611_0009
    Streaming Command Failed!
    Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, :
    hadoop streaming failed with error code 1
    14/03/25 10:57:09 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
    Deleted /tmp/file81d5d54392b

    Kindly give me any solution

    ReplyDelete
  4. I have integrated R with Hadoop using RHadoop and running belwo program from R.

    library(rmr2)
    library(rhdfs)
    hdfs.init()
    ints = to.dfs(1:100)
    calc = mapreduce(input = ints, map = function(k, v) cbind(v, 2*v))
    from.dfs(calc)

    Map reduce job runs successfully but it's giving null output as below.

    $key
    NULL
    $val
    NULL

    However it I use the option rmr.options(backend="local") then it's giving proper output.

    It will be a great help if you can please help me on this and provide the solution.

    ReplyDelete
  5. hdfs.init()
    sh: 1: classpath: not found
    Error in system(command, intern = TRUE) : error in running command

    ReplyDelete