Friday, May 9, 2014

User recommendations using Hadoop, Flume, HBase and Log4J - Part 1

Thanks to Srinivas Kummarapu for this post on how to show the appropriate recommendations to a web user based on the user activity in the past.

This first of a four part article is with the assumption that Hadoop, Flume, HBase and Log4J have been already installed. In this article we will see how to track the user activities and dump it into HDFS and HBase. In the future articles, we will look into some kind of basket analysis from the data in HDFS/HBase and will project the same to the transaction database for recommendations. Also, refer this article to Flume the data into HDFS.
Step-1) Log into HBase shell
bin/hbase shell
Create a user table with activities as column family.
create 'user', 'activities'
Step-2) Create a Flume configuration file (conf/flume-HBaseAgent.conf) with Avro as source and HBase as sink as below.
# Let the hbaseagent know what are its sources and sinks.
# hbaseagent which ones we want to activate.
hbaseagent.channels = ch1
hbaseagent.sources = avro-source1
hbaseagent.sinks = sink1

# Define an Avro source called avro-source1 on hbaseagent and tell it
# to bind to 0.0.0.0:41415. Connect it to channel ch1.
hbaseagent.sources.avro-source1.type = avro
hbaseagent.sources.avro-source1.bind = 0.0.0.0
hbaseagent.sources.avro-source1.port = 41415

# Define a memory channel called ch1 on hbaseagent
hbaseagent.channels.ch1.type = memory

#Define a Hbase sink called sink1 on hbaseagent and tell its hbase table and columnFamily
hbaseagent.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
hbaseagent.sinks.sink1.table = user
hbaseagent.sinks.sink1.columnFamily = activites
hbaseagent.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
hbaseagent.sinks.sink1.serializer.payloadColumn = col1
hbaseagent.sinks.sink1.serializer.keyType = timestamp
hbaseagent.sinks.sink1.serializer.rowPrefix = 1
hbaseagent.sinks.sink1.serializer.suffix = timestamp

#communicate source and sink with the channel ch1
hbaseagent.sources.avro-source1.channels = ch1
hbaseagent.sinks.sink1.channel = ch1
Step-3) Start Flume with the above config file as:
bin/flume-ng agent --conf ./conf/ -f conf/flume-HBaseAgent.conf -Dflume.root.logger=DEBUG,console -n agent1
Above will start Flume and Avro source will start listening at 41415. Run the below command to make sure that Avro is listening on port 41415. The contents of the /etc/hosts file should appear in the user table in HBase.
bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /etc/hosts  -flume.root.logger=DEBUG,console

Now that some records in the user table of HBase, lets Flume the data into HBase from a web page. For this write a Java program in Eclipse which will log the data of the user activities using Log4J. Similar code can be used in Jsp to create dynamic web pages.

Step-4) Create a project in Eclipse and include all files in the Flume lib folder as dependencies. Create a src/log4j.propeties file with the below content in the project. The configuration file specifies two appenders (File and Avro Flume). So, the Log4J events will be sent to both File and Avro.
# Define the root logger with appender file

log = /home/vm4learning/WorkSpace/BigData/Log4J-Example/log
log4j.rootLogger = INFO, FILE, flume


# Define the file appender

log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File=${log}/log.out
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.conversionPattern=%m%n


# Define the flume appender
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 41415
log4j.appender.flume.UnsafeMode = false
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
Below Java program will log the information of user activities like the phone she/he searched to both File and Avro appender in Avro Flume. Then Flume will write the events to the user HBase table.
import java.io.IOException;
import java.sql.SQLException;
import org.apache.log4j.Logger;

public class log4jExample {

    static Logger log = Logger.getRootLogger();
    public static void main(String[] args) throws IOException, SQLException {

         for (int i=1; i <=10; i++) {

             log.info("Apple is:" + i*2);
             log.info("Samsung is:" +i);

    }
}
Step-5) Scan the table user in HBase to find the java program output as activities.

The same results can be achieved by using the tail command on the log file generated by the Java program to stream the events to Flume. Specify the log.out in the tail command in Flume configuration file.
hbase-agent.sources = tail
hbase-agent.sinks = sink1
hbase-agent.channels = ch1

hbase-agent.sources.tail.type = exec
hbase-agent.sources.tail.command = tail -F /home/vm4learning/tail.log
hbase-agent.sources.tail.channels = ch1

hbase-agent.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
hbase-agent.sinks.sink1.channel = ch1
hbase-agent.sinks.sink1.table = students
hbase-agent.sinks.sink1.columnFamily = marks
hbase-agent.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
hbase-agent.sinks.sink1.serializer.payloadColumn = col1
hbase-agent.sinks.sink1.serializer.keyType = timestamp
hbase-agent.sinks.sink1.serializer.rowPrefix = 1
hbase-agent.sinks.sink1.serializer.suffix = timestamp

hbase-agent.channels.ch1.type=memory
In the next article we will see how to Sqoop the RDBMS table ITEMS into HBase table called ITEMS. And we will also perform incremental load using Sqoop to get the latest changed/inserted in the ITEMS table of RDBMS which is linked to the real time application using a Web Page. This incremental load script can be scheduled to run daily using Oozie coordinator. Later joins be performed on the USER table with the ITEMS table of HBase in HIVE with the help of HBase storage Handlers or joins using Phoenix.

No comments:

Post a Comment