Monday, March 11, 2013

Analyse Tweets using Flume, Hadoop and Hive

Note : Also don't forget to do check another entry on how to get some interesting facts from Twitter using R here. And also this entry on how to use Oozie for automating the below workflow. Here is a new blog on how to do the same analytics with Pig (using elephant-bird).

It's not a hard rule, but almost 80% of the data is unstructured, while the remaining 20% is structured data. RDBMS helps to store/process the structured data (20%), while Hadoop solves the problem of storing/processing both types of data. The good thing about Hadoop, is that it scales incrementally with less CAPEX in terms of software and hardware.

With the ever increasing usage of smart devices and the high speeds internet, unstructured data had been growing at a very fast rate. It's common to Tweet from a smart phone, take a picture and share it in Facebook.

In this blog we will try to get Tweets using Flume and save them into HDFS for later analysis. Twitter exposes the API (more here) to get the Tweets. The service is free, but requires the user to register for the service. Cloudera wrote a three part series (1, 2, 3) for Twitter Analysis using Hadoop, the code for the same is here. For the impatient, I will quickly summarize how to get data into HDFS using Flume and start doing some analytics using Hive.

Flume has the concepts of agents. The sources, sinks and the intermediate channels are the different types of agents. The sources can push/pull the data and send it to the different channels which in turn will send the data to the different sinks. Flume decouples the source (Twitter) and the sink (HDFS) in this case. Both the source and the sink can operate at different speeds, also it's much easier to add new sources and sinks. Flume comes with a set of sources, channels, sinks and new onces can be implemented by extending the Flume base classes.


1) The first step is to create an application in https://dev.twitter.com/apps/ and then generate the corresponding keys.


2) Assuming that Hadoop has already been installed and configured, the next step is download Flume and extract it to any folder.

3) Download the flume-sources-1.0-SNAPSHOT.jar and add it to the flume class path as shown below in the conf/flume-env.sh file
FLUME_CLASSPATH="/home/training/Installations/apache-flume-1.3.1-bin/flume-sources-1.0-SNAPSHOT.jar" 

The jar contains the java classes to pull the Tweets and save them into HDFS.

4) The conf/flume.conf should have all the agents (flume, memory and hdfs) defined as below
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <consumerKey>
TwitterAgent.sources.Twitter.consumerSecret = <consumerSecret>
TwitterAgent.sources.Twitter.accessToken = <accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret = <accessTokenSecret>

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

The consumerKey, consumerSecret, accessToken and accessTokenSecret have to be replaced with those obtained from https://dev.twitter.com/apps. And,  TwitterAgent.sinks.HDFS.hdfs.path should point to the NameNode and the location in HDFS where the tweets will go to.

The TwitterAgent.sources.Twitter.keywords value can be modified to get the tweets for some other topic like football, movies etc.

5) Start flume using the below command
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
After a couple of minutes the Tweets should appear in HDFS.


6) Next download and extract Hive. Modify the conf/hive-site.xml to include the locations of the NameNode and the JobTracker as below
<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:9000</value>
     </property>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:9001</value>
     </property>
</configuration>
7) Download hive-serdes-1.0-SNAPSHOT.jar to the lib directory in Hive. Twitter returns Tweets in the JSON format and this library will help Hive understand the JSON format.

8) Start the Hive shell using the hive command and register the hive-serdes-1.0-SNAPSHOT.jar file downloaded earlier.
ADD JAR /home/training/Installations/hive-0.9.0/lib/hive-serdes-1.0-SNAPSHOT.jar;
9) Now, create the tweets table in Hive
CREATE EXTERNAL TABLE tweets (
   id BIGINT,
   created_at STRING,
   source STRING,
   favorited BOOLEAN,
   retweet_count INT,
   retweeted_status STRUCT<
      text:STRING,
      user:STRUCT<screen_name:STRING,name:STRING>>,
   entities STRUCT<
      urls:ARRAY<STRUCT<expanded_url:STRING>>,
      user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
      hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   user STRUCT<
      screen_name:STRING,
      name:STRING,
      friends_count:INT,
      followers_count:INT,
      statuses_count:INT,
      verified:BOOLEAN,
      utc_offset:INT,
      time_zone:STRING>,
   in_reply_to_screen_name STRING
) 
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets';
Now that we have the data in HDFS and the table created in Hive, lets run some queries in Hive.

One of the way to determine who is the most influential person in a particular field is to to figure out whose tweets are re-tweeted the most. Give enough time for Flume to collect Tweets from Twitter to HDFS and then run the below query in Hive to determine the most influential person.
SELECT t.retweeted_screen_name, sum(retweets) AS total_retweets, count(*) AS tweet_count FROM (SELECT retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweet_count) as retweets FROM tweets GROUP BY retweeted_status.user.screen_name, retweeted_status.text) t GROUP BY t.retweeted_screen_name ORDER BY total_retweets DESC LIMIT 10;
Similarly to know which user has the most number of followers, the below query helps.
select user.screen_name, user.followers_count c from tweets order by c desc; 
For sake of making it simple, partitions have not been created in Hive. Partitions can be created in Hive using Oozie at regular intervals to make the queries run faster if queried for a particular period time. Creating partitions will be covered in another blog.

Happy Hadooping !!!

Edit (21st March, 2013) :  Hortonworks blogged a two part series (1 and 2) on Twitter data processing using Hive.

Edit (30th March, 2016) : With the latest version of Flume, the following error is thrown because of the conflicts in the libraries

java.lang.NoSuchMethodError: twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery;
at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:139)'

One solution is to remove the below libraries in the Flume lib folder. There are a couple of more solutions in this StackOverflow article.

lib/twitter4j-core-3.0.3.jar
lib/twitter4j-media-support-3.0.3.jar
lib/twitter4j-stream-3.0.3.jar

Tuesday, March 5, 2013

ClickStream analysis using Pig and Hive

Many of the e-commerce sites had been making a quite an impact on the overall economy for quite some time in many of the countries. Recently it had picked up quite good in India also. Flipkart is one of the top portal in India. Some time back I bought a HP 430 Notebook from Flipkart.

All the e-commerce portals store the user activities on their site as clickstream activity and later they analyze it to identify what the user has browsed and show the appropriate recommendations when the user visits the site again.
In this blog entry, we will see how to analyze the clickstream and the user data together using Pig and Hive. The challenge is to find the top 3 URLs visited by users whose age is less than 16 years.

To get a quick overview of Pig and Hive, Hadoop - The Definitive Guide is the best resource, but to deep dive Programming Hive and Programming Pig are the best bets. Some time back I compared Pig and Hive in a blog entry.

Data from the external systems can be pushed into HDFS using Sqoop, Flume and in many other ways. For now, lets assume that the user data and the clickstream data is already there is HDFS as shown below. For sake of simplicity only a few columns have been included, but can be made much more complex.
/user/training/user/part-m-00000

1,Praveen,20,India,M
2,Prajval,5,India,M
3,Prathibha,15,India,F
/user/training/clickstream.txt

1,www.bbc.com
1,www.abc.com
1,www.gmail.com
2,www.cnn.com
2,www.eenadu.net
2,www.stackoverflow.com
2,www.businessweek.com
3,www.eenadu.net
3,www.stackoverflow.com
3,www.businessweek.com
Let's create the 'user' table in Hive and map the user data in HDFS to the table
CREATE TABLE user (
   user_id INT,
   name STRING,
   age INT,
   country STRING,
   gender STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA INPATH '/user/training/user/part-m-00000' OVERWRITE INTO TABLE user;

Similarly, let's create the 'clickstream' table in Hive and map the user data in HDFS to the table
CREATE TABLE clickstream (
   userid INT,
   url STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA INPATH '/user/training/clickstream.txt' OVERWRITE INTO TABLE clickstream;
Here is the HiveQL query to get the top 3 URLs visited by user whose age is less than 16. The query looks very similar to SQL, which makes it easy to get started with Hive. Hive automatically creates a plan for the below query and submits it to the Hadoop cluster. SQL interfaces are being added to the Big Data frameworks to make it easier for those who are familiar with SQL to get started with the different Big Data frameworks easily. Here is an interesting article from GigaOM on the same.
SELECT url,count(url) c FROM user u JOIN clickstream c ON (u.user_id=c.userid) where u.age<16 group by url order by c DESC LIMIT 3;
Here is the PigLatin code for the same.
Users1 = load '/user/training/user/part-m-00000' using PigStorage(',') as (user_id, name, age:int, country, gender);
Fltrd = filter Users1 by age <= 16;
Pages  = load '/user/training/clickstream.txt' using PigStorage(',') as (userid, url);
Jnd = join Fltrd by user_id, Pages by userid;
Grpd = group Jnd by url;
Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;
Srtd = order Smmd by clicks desc;
Top3 = limit Srtd 3;
dump Top3;
Here is the output after running the HiveQL query.
MapReduce Jobs Launched:
Job 0: Map: 2  Reduce: 1   Cumulative CPU: 2.4 sec   HDFS Read: 686 HDFS Write: 238 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 1.35 sec   HDFS Read: 694 HDFS Write: 238 SUCCESS
Job 2: Map: 1  Reduce: 1   Cumulative CPU: 1.32 sec   HDFS Read: 694 HDFS Write: 64 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 70 msec
OK
www.businessweek.com    2
www.eenadu.net    2
www.stackoverflow.com    2
Time taken: 105.076 seconds
Here is the output after running the PigLatin script.
2013-03-04 21:06:57,654 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2013-03-04 21:06:57,656 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2013-03-04 21:06:57,667 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2013-03-04 21:06:57,667 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(www.eenadu.net,2)
(www.businessweek.com,2)
(www.stackoverflow.com,2) 
Just a quick comparison Hive is converting the query into 3 MR jobs and took around 105 seconds for completion while Pig is converting the PigLatin script into 5 MR jobs and is taking around 210 seconds for completion.

Didn't spend much time  to look at why Pig is twice as slow when compared to Hive. Could have used partitions in Hive to make the Hive query run faster. If anyone is really interested in making the earlier mentioned Pig and Hive queries faster, please send me an email at praveensripati@gmail.com and I will post a follow on blog post with the consolidated suggestions after trying them out.

Also, if anyone is interested in any POC (Proof Of Concept) ideas around Big Data / Hadoop send me an email at praveensripati@gmail.com. I will attribute it to them :)

Happy hadooping !!!!