Friday, May 16, 2014

User recommendations using Hadoop, Flume, HBase and Log4J - Part 2

Thanks to Srinivas Kummarapu for this post on how to show the appropriate recommendations to a web user based on the user activity in the past.

In the previous blog we have seen how to Flume the user activities into the Hadoop cluster. On top of these user activities some analysis can be done to figure out what a particular user is interested in.

For example if a user wants to buy a mobile from a shopping site and ended up buying none, we got all his activities into Hadoop cluster on which analysis can be done to figure out what type of phones that particular user is interested in. The interested phones can be recommended when the user visits the site again.

The user activities in the HBase consists of only mobile name and no more details. More details about the mobile phone can be maintained in a RDBMS. We need to do join the RDBMS data (mobile details) with the HBase to send the information to the Recommendations tables of RDBMS in order to recommend the user.

Here we have two options to perform Joins.

1) Send the result of the Hadoop cluster to RDBMS and do Joins there.
2) Get the RDBMS data into HBase to perform join in parallel distributed fashion.

Both can be done by a Map-Only Jobs tool called Sqoop (SQl to haOOP).
In this article we will see how to Sqoop the RDBMS table into the HBase database in an incremental fashion.
Hadoop is batch oriented, then how would Hadoop can get the most recently updated data into the cluster (like real-time). For example, if the Mobile vendor has increased the price of the mobile in RDBMS it has to get into Hadoop cluster within 10 minutes to make the recommendations more real. We will answer the question in this blog.

Step1) Prerequisites for RDBMS and NOSQL databases.

Open RDBMS (MySQL) and see the table ITEMS which is tied to the web application where product team has authority over this table. Below is the table.
Create database dbsit and in this database create table ITEMS.
use dbsit;
create table ITEMS(id  int  not  null primary  key auto_increment,phone_name  varchar(64)  not null,model_name  varchar(10)  not  null,price  decimal(10,2),memory_card_size  varchar(8),mega_pixel varchar(10),color   varchar(10),Date_Item  DATE);
Open HBase shell and create a table PRODUCT with column family as PHONE. Issue count to find the number of records.
Step 2) Run a Normal Sqoop import to get the ITEMS data of RDBMS into HBase PRODUCT table.
bin/sqoop import --connect jdbc:mysql://localhost/dbsit --table ITEMS --hbase-table PRODUCT --column-family PHONE

Now go to HBase shell and see the records of HBase distributed into column family. The output looks like as below screen.
Step 3) Get a newly inserted record into HBase from RDBMS.

Lets say that the product team has added another record (aka a new phone model) into ITEMS table of RDBMS. Below insert will happen from a web page. For the sake of convenience lets insert a row directly into the database.

Go to MySQL and run the below command
insert into ITEMS values (9,'NOKIA','LUMIA_920',278,'32GB','8MP','BLACK','2014-05-11');
Once the new record got inserted into RDBMS, the question is how do we get the newly added records into HBase table? Running the Step 2 Sqoop command again will get will get all the records and not only the newly added record to the RDBMS.

Sqoop has a concept called incremental import. Incremental Import comes in two flavor 1) append (inserted) 2) last modified (updated one). Incremental import in append mode will copy the newly created rows from source to target. This saves a considerable amount of resources compared with doing a full import every time the data needs be in sync.

One disadvantage is the need to know the value of the last imported row so that next time Sqoop can start off where it ended. Sqoop when running in incremental mode always prints out the value of the last imported row, which can be used to start the incremental load.

Before issue incremental append command the last value from the HBase table has to be got which is ‘8’. We will see how to overcome the disadvantage of keeping the last imported row by using metastore.

Execute the below Sqoop command.
bin/sqoop import --connect jdbc:mysql://localhost/dbsit --table ITEMS --hbase-table PRODUCT --column-family PHONE --incremental append --check-column id --last-value 8

Scan the PRODUCT table from the HBase shell to observer the extra record as 9th record as shown below.
Step 4) Get a recently updated record into HBase from RDBMS.

Lets say HTC-ONE mobiles are successfully in the market and the vendor wants to increase the price of the mobile from 450$ to 480$. The product team will update the table ITEMS on 12th May 12, 2014 as shown below.
Now in order to sync between RDBMS and HBase, only a single record has to be updated in HBase. This will be done by using incremental import last modified.

To get the updated record into HBase, there is one necessary condition. It is like the updated record should contain date or time column which got updated by the application. Go to HBase and get the record of HTC_ONE before update.
Go to Sqoop
bin/sqoop import --connect jdbc:mysql://localhost/dbsit  --table ITEMS --hbase-table PRODUCT --column-family PHONE --incremental lastmodified --check-column Date_Item --last-value "2014-05-11"
After finishing the above job go to HBase shell and get the updated record as below.
Step 5) How to overcome the remembering of the last value in the incremental import.

From Step 3 and Step 4, we got the recently inserted/updated data into HBase. But the issue with the Step3 and Step 4 is we have to remember the last value or last modified value. It’s very hard if the data is huge and in production we will have many tables. To overcome this, Sqoop metastore can be used which will automatically store the last inserted/updated value in its meta data.

So, we will create a metastore for the Step 3. Before creating a metastore, insert a new record into RDBMS and get the latest one without the last value by not remembering. Add NOKIA LUMIA_820 mobile added to the ITEMS table in RDBMS.
We can get the recently added record without remembering the last value by creating metastore.
bin/sqoop job --create product_ms --import --connect jdbc:mysql://localhost/dbsit --table ITEMS --hbase-table PRODUCT --column-family PHONE --incremental append --check-column id --last-value 0
After creating a metastore we have to execute it like as below
bin/sqoop job --exec product_ms
Metastore while executing will display following screen on the console
SELECT MIN(`id`), MAX(`id`) FROM `ITEMS` WHERE ( `id` > 0 AND `id` <= 9 ) which automatically got the 10th record.

After job completed successfully go to HBase shell and notice the new record in MySQL getting inserted into HBase.
Now, we will see how the metastore saves the last value by using the below command.
bin/sqoop job --show product_ms
As shown below the incremental.last.value is set to 10. This way we can avoid remembering the last value concept in incremental import.
Step 6) Schedule Metastore job with help of OOZIE coordinator.

We can use Oozie coordinator as a scheduler to execute Sqoop metastore for every 10 minutes which will import data from RDBMS into HBase in incremental way.
bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-clickstream-examples/apps/PRODUCT_SQOOP_CO/coordinator.properties –run
The coordinator.properties is as below.
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=oozie-clickstream-examples
examplesRootDir=/user/${user.name}/${examplesRoot}
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/PRODUCT_SQOOP_CO
The coordinator.xml workflow is as below
<coordinator-app name="wf_scheduler" frequency="10" start="2013-10-25T07:40Z" end="2013-10-25T07:45Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
   <action>
      <workflow>
         <app-path>${nameNode}${examplesRootDir}/apps/PRODUCT_SQOOP</app-path>
      </workflow>
   </action>
</coordinator-app>
The above coordinator will invoke below job.properties.
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default

examplesRoot=oozie-clickstream-examples
examplesRootDir=/user/${user.name}/${examplesRoot}

oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/PRODUCT_SQOOP

The above job.properties will call below workflow.xml.
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf">
    <start to="sqoop-node"/>
    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${examplesRootDir}/input-data/user"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>job --exec product_ms</command>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

In the upcoming blogs we will look into some interesting things around recommendations using Hive.

1 comment:

  1. where are the other two parts "User recommendations using Hadoop, Flume, HBase and Log4J - Part 3" and "User recommendations using Hadoop, Flume, HBase and Log4J - Part 4"

    ReplyDelete