In the earlier blog entries, we have looked into how install Oozie
here and how to do the Click Stream analysis using Hive and Pig
here.
This blog is about executing a simple work flow which imports the User data from MySQL database using Sqoop, pre-processes the Click Stream data using Pig and finally doing some
basic analytics on the User and the Click Stream using Hive.
Before running a Hive query, the table/column/column-types have to be defined. Because of this, the data for Hive needs to have some structure. Pig is better for processing semi structured data when compared to Hive.
Here is Pig vs Hive at a very high level. Because of the above mentioned reason, one of the use case is that Pig being used for pre-processing (filter out the invalid records, massage the data etc) of the data to make it ready for Hive to consume.
The below
DAG was generated by Oozie. The fork will spawn a Pig action (which cleans the Click Stream data) and a Sqoop action (which imports the user data from a MySQL database) in parallel. Once the Pig and the Sqoop actions are done, the Hive action will be started to do the final analytics combining the Click Stream and the User data.
Here are the steps to define the work flow and then execute it. This is with the assumption that MySQL, Oozie and Hadoop have been installed, configured and work properly.
Here are the instructions for installing and configuring Oozie.
- The work flow requires more than 2 map slots in the cluster, so if the work flow is executed on a single node cluster the following has to be included in the `mapred-site.xml`.
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>4</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>4</value>
</property>
- Create the file `oozie-clickstream-examples/input-data/clickstream/clickstream.txt` in HDFS with the below content. Note than the last record is an invalid record which is filtered by Pig when the work flow is executed. The first field is the userId and the second field is the site visited by the user.
1,www.bbc.com
1,www.abc.com
1,www.gmail.com
2,www.cnn.com
2,www.eenadu.net
2,www.stackoverflow.com
2,www.businessweek.com
3,www.eenadu.net
3,www.stackoverflow.com
3,www.businessweek.com
A,www.thecloudavenue.com
- Create a user table in MySQL
CREATE TABLE user (
user_id INTEGER NOT NULL PRIMARY KEY,
name CHAR(32) NOT NULL,
age INTEGER,
country VARCHAR(32),
gender CHAR(1)
);
- And insert some data into it
insert into user values (1,"Tom",20,"India","M");
insert into user values (2,"Rick",5,"India","M");
insert into user values (3,"Rachel",15,"India","F");
- Extract the `oozie-4.0.0/oozie-sharelib-4.0.0.tar.gz` file from the Oozie installation folder and copy the
mysql-connector-java-*.jar to the `share/lib/sqoop` folder. This jar is required for Sqoop to connect to the MySQL database and get the user data.
- Copy the above mentioned `share` folder into HDFS.
Here is the significance of sharelib in Oozie. These are the common libraries which are used across different actions in Oozie.
bin/hadoop fs -put /home/vm4learning/Code/share/ /user/vm4learning/share/
- Create the work flow file in HDFS (oozie-clickstream-examples/apps/cs/workflow.xml). Note that the connect string for the Oozie has to be modified appropriately.
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="cs-wf-fork-join">
<start to="fork-node"/>
<fork name="fork-node">
<path start="sqoop-node" />
<path start="pig-node" />
</fork>
<action name="sqoop-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${examplesRootDir}/input-data/user"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:mysql://localhost/clickstream --table user --target-dir ${examplesRootDir}/input-data/user -m 1</command>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="pig-node">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}${examplesRootDir}/intermediate"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<script>filter.pig</script>
<param>INPUT=${examplesRootDir}/input-data/clickstream</param>
<param>OUTPUT=${examplesRootDir}/intermediate</param>
</pig>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="hive-node"/>
<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/${examplesRootDir}/finaloutput"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>script.sql</script>
<param>CLICKSTREAM=${examplesRootDir}/intermediate/</param>
<param>USER=${examplesRootDir}/input-data/user/</param>
<param>OUTPUT=${examplesRootDir}/finaloutput</param>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
- Create the job.properties file in HDFS (oozie-clickstream-examples/apps/cs/job.properties).
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=oozie-clickstream-examples
examplesRootDir=/user/${user.name}/${examplesRoot}
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cs
- Create the Hive script file in HDFS (oozie-clickstream-examples/apps/cs/script.sql). The below mentioned query will find the top 3 url's visited by users whose age is less than 16.
DROP TABLE clickstream;
CREATE EXTERNAL TABLE clickstream (userid INT, url STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '${CLICKSTREAM}';
DROP TABLE user;
CREATE EXTERNAL TABLE user (user_id INT, name STRING, age INT, country STRING, gender STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '${USER}';
INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT url,count(url) c FROM user u JOIN clickstream c ON (u.user_id=c.userid) where u.age<16 group by url order by c DESC LIMIT 3;
- Create the Pig script file in HDFS (oozie-clickstream-examples/apps/cs/filter.pig).
clickstream = load '$INPUT' using PigStorage(',') as (userid:int, url:chararray);
SPLIT clickstream INTO good_records IF userid is not null, bad_records IF userid is null;
STORE good_records into '$OUTPUT';
- Execute the Oozie workflow as below. Note that the `job.properties` file should be present in the local file system and not in HDFS.
bin/oozie job -oozie http://localhost:11000/oozie -config /home/vm4learning/Code/oozie-clickstream-examples/apps/cs/job.properties -run
- Initially the job will be in the `RUNNING` state and finally will reach the `SUCCEEDED` state. The progress of the work flow can be monitored from Oozie console at http://localhost:11000/oozie/.
- The output should appear as below in the `oozie-clickstream-examples/finaloutput/000000_0` file in HDFS.
www.businessweek.com 2
www.eenadu.net 2
www.stackoverflow.com 2
Here are some final thoughts on Oozie.
- It's better to test the individual actions like Hive, Pig and Sqoop independent of Ooize and later integrate them in the Oozie work flow.
- The Oozie error messages very cryptic and the MapReduce log files need to be looked to figure out the actual error.
- The Web UI which comes with Oozie is very rudimentary and clumsy, need to look into some of the alternatives.
- The XML for creating the work flows is very verbose and is very error prone. Any UI for creating workflows for Oozie would be very helpful.
Will look into the alternatives for some of the above problems mentioned in a future blog entry.