Monday, April 18, 2016

PageRank on the English wiki data using MapReduce

PageRank is an iterative processing to find the relevancy of a web page in the world-wide-web. PageRank is one of the signal used by the search engine to figure out what to show at the top and what at the bottom of the search results. The `Data-Intensive Text Processing with MapReduce` has a very good description of what PageRank is and how to solve it in a MapReduce way.

As mentioned above PageRank is an iterative algorithm and MapReduce model is not good for iterative processing as the input and output of MapReduce in disk (from HDFS) which is really slow when compared to reading the data from the memory as in the case of Spark.

Anyway, working MapReduce code was available for the PageRank algorithm to process the Wiki data and so I thought of giving it shot. Here is the code and the description for the same. While iterating, if the difference between two iterations is small then the iterations are terminated. This logic is not implemented in the code.

Here is the data (english wiki) for the same. For those who don't have enough resources then a small wiki dump is also available for other languages.

It took 7 minutes short of 3 hours to process 52.58 GB of enhlish wiki data. The processing included a total of 7 MR programs. The first one for parsing the wiki xml data, 5 iterations using MR to calculate the PageRank and the final MR program for ordering the wiki pages based on the ranking.

Here is the configuration of the machine on which the wiki data has been processed.

Here is the folder structure in HDFS.


Here is the size of the input data.


Here is the size of the data after the first MR program (parsing the wiki). Not that the size of the data got decreased significantly, as we only need the list of web pages, the web pages it has connected to and the initial page ranking to start the PageRank algorithm.


Here is the size of the final result, which is significantly small as only the webpage and the corresponding PageRank.


The file is a bit huge to open in gedit. So, head/tail is another way to look at the end of the file. Here is the screen for the same. Proud to see India in the top 10 list.

 
Here is the screenshot of the MapReduce console with the 7 MapReduce programs.


Here are the details of the first MapReduce program (xml parsing).


 Here are the details of the MapReduce program implementing a single MapReduce iteration for calculating the PageRank.


Here are the details of the MapReduce program implementing the sorting based on the PageRank.


The rational behind publishing so many screen shots is for the readers to get the different metrics for the processing of the English wiki data. Your mileage might very depending on the configuration of the machine and the size of the input wiki data used for the processing.

In the coming blogs, we will look into how to do the same think with Spark.

Thursday, April 14, 2016

Analyzing the airline dataset with Pig

This time the Airline dataset is being analyzed to find the number of delayed flights per origin using Pig. The below Pig Latin code is self explanatory for those who are familiar with SQL. There are a lot of similarities between Pig Latin and SQL, but there are a few quirks also. Here is a nice article from Hortonworks mapping the SQL syntax with the Pig Latin syntax.
ontime = load '/user/bigdata/airline/input' using PigStorage(',') as 
(
  year:int,
  month:int,
  dayofmonth:int,
  dayofweek:int,
  deptime:int,
  crsdeptime:int,
  arrtime:int,
  crsarrtime:int,
  uniquecarrier:chararray,
  flightnum:int,
  tailnum:chararray,
  actualelapsedtime:int,
  crselapsedtime:int,
  airtime:int,
  arrdelay:int,
  depdelay:int,
  origin:chararray,
  dest:chararray,
  distance:int,
  taxiin:int,
  taxiout:int,
  cancelled:int,
  cancellationcode:chararray,
  diverted:chararray,
  carrierdelay:int,
  weatherdelay:int,
  nasdelay:int,
  securitydelay:int,
  lateaircraftdelay:int
);
 
delayed_flights = filter ontime by deptime > crsdeptime; 
delayed_flights_grouped  = group delayed_flights by origin;
delayed_flights_count = foreach delayed_flights_grouped generate group, COUNT(delayed_flights); 
 
dump delayed_flights_count;
Here is the partial output of the Pig Latin script.


The above Pig Latin script has been converted into a DAG of 1 MR programs as shown below and took a total of 20 min and 26 sec.



Now that the Airline data in the CSV format has been analyzed, lets look into how to do the same thing with the Parquet data format.

Download the parquet-pig-bundle-1.6.0.jar from here. Once downloaded the jar file has to be registered and the ParquetLoader used as shown below. The ParquetLoader will read data in the Parquet format and then covert it into Pig object model, so once the data is loaded rest of the commands are all the same.
REGISTER /home/bigdata/Installations/pig-0.15.0/lib/parquet-pig/parquet-pig-bundle-1.6.0.jar;

ontime = load '/user/bigdata/airline/input-parquet' using parquet.pig.ParquetLoader as
(
  year:int,
  month:int,
  dayofmonth:int,
  dayofweek:int,
  deptime:int,
  crsdeptime:int,
  arrtime:int,
  crsarrtime:int,
  uniquecarrier:chararray,
  flightnum:int,
  tailnum:chararray,
  actualelapsedtime:int,
  crselapsedtime:int,
  airtime:int,
  arrdelay:int,
  depdelay:int,
  origin:chararray,
  dest:chararray,
  distance:int,
  taxiin:int,
  taxiout:int,
  cancelled:int,
  cancellationcode:chararray,
  diverted:chararray,
  carrierdelay:int,
  weatherdelay:int,
  nasdelay:int,
  securitydelay:int,
  lateaircraftdelay:int
);

delayed_flights = filter ontime by deptime > crsdeptime; 
delayed_flights_grouped  = group delayed_flights by origin;
delayed_flights_count = foreach delayed_flights_grouped generate group, COUNT(delayed_flights); 
 
dump delayed_flights_count;
Looks like Pig loads the Parquet files into memory and then uncompresses them. So, Pig and the underlying MR programs thrown an error as shown below. For some reason the same doesn't happen with Hive while processing the same Parquet data. Not sure why !!!



`mapred.child.java.opts` has been set to `-Xmx200m` in the mapred-default.xml. Xmx specifies the maximum memory allocation pool for a Java Virtual Machine (JVM). Increasing it to 1024 MB by adding the below to the mapred-site.xml fixed the problem. A lower value might have also solved the problem.
<property>
   <name>mapred.child.java.opts</name>
   <value>-Xmx1024m</value>
</property>
After changing the core-site.xml, below is the partial output from the Grunt.


As shown in the below MR console, the Pig script takes 2 m 51 s in total.

 

Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.


Any tips to make it faster, let know in the comments and I will write a new blog on the same or update the existing one.

Monday, April 11, 2016

Analyzing the airline dataset with Spark/Python

The airline dataset in the previous blogs has been analyzed in MR and Hive, In this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, almost same as the earlier MR program. Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.


The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.
from pyspark import SparkConf
from pyspark import SparkContext

import csv

def extractData(record) :

    #split the input record by comma
    splits = record.split(',')

    #extract the actual/scheduled departure time and the origin
    actualDepTime = splits[4]
    scheduledDepTime = splits[5]
    origin = splits[16]

    #1 delayed
    #0 don't know or not delayed
    delayed = 0

    # Check if the actual/scheduled departure time is a digit
    if actualDepTime.isdigit() & scheduledDepTime.isdigit():

        #if the flight got delayed or not
        if int(actualDepTime) > int(scheduledDepTime) :
            delayed = 1

    #return the origin and delayed status as a tuple
    return origin, delayed

#create the SparkConf() instance on which the different configurations can be done
conf = SparkConf().setMaster("spark://bigdata-server:7077")

#establish a connection to Spark
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")

#load the input data
lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")

#figure out the delayed flights and cache the data as it is being processed twice later
delayedFlights = lines.map(extractData).cache()

#get the delayed flights
delayedFlights.reduceByKey(lambda a, b : a + b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")

#get the total flights
totalFlights = delayedFlights.countByKey()

#totalFlights is dictionary. Iterate the same and write to a file
w = csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))
for key, val in totalFlights.items():
    w.writerow([key, val])
As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.


Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable.
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON
delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")
Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.

 
Here is the output back in HDFS.


Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.


In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.

Monday, April 4, 2016

Analyzing the airline dataset with Hive (without partitions)

One of the previous blog compared Hive and Pig with MapReduce. Hive and Pig scripts are easier to write when compared to MapReduce as they are abstractions on top of MapReduce. It's not exactly the same, but it seems like writing in assembly when working with MapReduce,

Last blog discussed about processing the airline data with MapReduce. Driver code, Mapper code, Reducer code blah blah blah ... with Hive it becomes pretty easy. Just create a table, map it to the data and then simply run queries on the same.

Below is the table definition for the airline dataset.
create external table ontime (
  Year INT,
  Month INT,
  DayofMonth INT,
  DayOfWeek INT,
  DepTime  INT,
  CRSDepTime INT,
  ArrTime INT,
  CRSArrTime INT,
  UniqueCarrier STRING,
  FlightNum INT,
  TailNum STRING,
  ActualElapsedTime INT,
  CRSElapsedTime INT,
  AirTime INT,
  ArrDelay INT,
  DepDelay INT,
  Origin STRING,
  Dest STRING,
  Distance INT,
  TaxiIn INT,
  TaxiOut INT,
  Cancelled INT,
  CancellationCode STRING,
  Diverted STRING,
  CarrierDelay INT,
  WeatherDelay INT,
  NASDelay INT,
  SecurityDelay INT,
  LateAircraftDelay INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/bigdata/airline/input';
Below is the query to find the number of flight delays for each origin as in the case of the earlier MapReduce program.
select Origin, count(*) from ontime where DepTime > CRSDepTime group by Origin;
Here is the partial output of the program. Just to make sure I compared it with the output of the MapReduce program.


As shown in the below screen, Hive took 5 min 57 sec. Just to recollect I used Hive on top of MapReduce (not Spark), so is the below screen from the MapReduce web console.


The hand coded MapReduce program in the previous blog took 5 min 51 sec as shown below to process the same data. Hive is short of 6 seconds when compared to the hand coded MapReduce program. Not much of a significant different, but the good thing is Hive programs are easier to write.


Not sure why, I need to look into a bit more detail, but the CPU usage in the System monitor was going up and down during the Hive processing. This was not the case with MapReduce, where it was constant.


The original airline data is in the CSV format. It took 5 min 57 sec to process 11.5 GB of data. With the csv data (which is the row format) there is a lot of Disk IO and so a lot of time to process the data.

Parquet based on the Google Dremel paper stores the data in the columnar format.The data in the columnar format is small when compared to the row format and so less Disk I/O. We will look into how to convert the available csv data into Parquet format and see how the performance improves for the same query.

First step is to create a Hive table with the same columns and specify the storage format as Parquet.
create external table ontime_parquet (
  Year INT,
  Month INT,
  DayofMonth INT,
  DayOfWeek INT,
  DepTime  INT,
  CRSDepTime INT,
  ArrTime INT,
  CRSArrTime INT,
  UniqueCarrier STRING,
  FlightNum INT,
  TailNum STRING,
  ActualElapsedTime INT,
  CRSElapsedTime INT,
  AirTime INT,
  ArrDelay INT,
  DepDelay INT,
  Origin STRING,
  Dest STRING,
  Distance INT,
  TaxiIn INT,
  TaxiOut INT,
  Cancelled INT,
  CancellationCode STRING,
  Diverted STRING,
  CarrierDelay INT,
  WeatherDelay INT,
  NASDelay INT,
  SecurityDelay INT,
  LateAircraftDelay INT
)
STORED AS PARQUET
LOCATION '/user/bigdata/airline/input-parquet';
Now the data has to be moved from the regular to the table with the table with Parquet format with the below SQL command.
INSERT OVERWRITE TABLE ontime_parquet SELECT * FROM ontime;
The above query will move the data from the regular table (ontime) to table with the storage format as Parquet (ontime_parquet). Behind the scenes Hive uses a MapReduce program for the conversion. The MapReduce program took 9 min and 23 sec for the conversion from csv to Parquet format. And the size of the data for decreased from 11.5 GB (CSV) to 1.7 GB (Parquet).


Now is the time to run the query to get the number of delayed flights on the table with Parquet storage format.
select Origin, count(*) from ontime_parquet where DepTime > CRSDepTime group by Origin;
With the data in the Parquet format the time to run the above query came down from 5 min 57 sec for the CSV format to 58 sec for the Parquet format. Below is the MapReduce console for the same.


Here is a quick summary for processing the data (both csv and parquet) with different softwares. Helps the readers to figure out how the different softwares and the formats perform.


Now that the airline data set is also available in the Parquet format, I will try to use both the csv and the Parquet data along with different softwares in the future blogs.

Thursday, March 31, 2016

Twitter analysis with Pig and elephant-bird

Twitter analysis has been one of the popular blog on this site. Flume has been used to gather the data and then Hive has been used to do some basic analytics.  Performing the same with Pig had been pending for quite some time, so here it is.

The JSONLoader which comes with Pig can be used with Pig to load the JSON data from Flume. But, the problem with the JSONLoader is that that the entire scheme has to be specified as shown below. In the case of the Twitter data, the scheme becomes really huge and complex.

students = LOAD 'students.json'  USING JsonLoader('name:chararray, school:chararray, age:int');

So, I started using elephant-bird for processing the JSON date. With the JsonLoader from elephant-bird, there is no need to specify the schema. The JsonLoader simply returns a Pig map datatype and fields can be accessed using the JSON property name as shown below.
REGISTER '/home/bigdata/Installations/pig-0.15.0/lib/elephantbird/json-simple-1.1.1.jar'
REGISTER '/home/bigdata/Installations/pig-0.15.0/lib/elephantbird/elephant-bird-pig-4.3.jar'
REGISTER '/home/bigdata/Installations/pig-0.15.0/lib/elephantbird/elephant-bird-hadoop-compat-4.3.jar'

tweets = LOAD '/user/bigdata/tweetsJSON/' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
user_details = FOREACH tweets GENERATE json#'user' As tweetUser;
user_followers = FOREACH user_details GENERATE (chararray)tweetUser#'screen_name' As screenName, (int)tweetUser#'followers_count' As followersCount;
user_followers_distinct = DISTINCT user_followers;
user_followers_sorted = order user_followers_distinct by followersCount desc;

DUMP user_followers_sorted;
The above program got converted into a DAG of 3 MapReduce programs and took 1 min 6 sec to complete, which is not really that efficient. It should be possible to implement the same using two MapReduce programs. I am not sure if there is any way to optimize the above Pig script. Any feedback, please let me know in the comments and I will try it out and update the blog.


The same data was proceed using Hive with the JSONSerDe provided by Cloudera as mentioned in the original blog. only a single MapReduce program was generated and it took 21 seconds to process the data, a drastic improvement over Pig using the elephant-bird library.


In the coming blogs. we will look a few other ways of processing the JSON data which is a very common format.

Tuesday, March 29, 2016

Analyzing the airline dataset with MR/Java

In the previous blog I introduced the Airline data set. I plan to get the results (total and delayed flights from different airports) using different Big Data softwares like Hadoop(MR), Hive, Pig, Spark, Impala etc and also with different formats of the data like Avro and Parquet. This will help the followers of the blog appreciate the different distributing computing software and models.

As usual there can be more than one of performing a particular task and a more efficient way. I have written the programs in a short span of time and will try to optimize them over time. Also, I will include as many comments as possible in the code, so as to make the code self explanatory.

Recently I have assembled a high configuration computer, which I like to call as a server :) with the below configurations. Ubuntu 14.04 64 bit has been installed on it. All the processing would be done on this server. It was fun looking for the right parts and assembling the computer. Initially I planned to buy an SSD instead of a Hard Disk, but the SSD were too expensive beyond the budget I planned.

Processor - AMD FX-6300 (FD6300WMHKBOX)
Mother Board - GIGABYTE GA-990XA-UD3
RAM - G.skill 2 * GB F3-12800CL10D-16GBXL
Hard Disk - Seagate 3.5 Inches Desktop 2 TB Hard Drive (ST2000DX001)


So, here is the MR program for calculating the total flight departures and the number of flight delays for the 20 years for each of the airport. Initially I wrote two MR programs, one for calculating the total flights and another for calculating the total delayed flights. This approach is not really efficient as the input data is parsed twice. So, I revamped the the program into one MR program.

Here is the MR Driver Code.
package AirlineDelaySinglePass;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class AirlineDelayByStartTime {

 public static void main(String[] args) throws IOException {

  // Check if the number of parameters passed to program is 2
  if (args.length != 2) {
   System.err
     .println("Usage: AirlineDelayByStartTime <input path> <output path>");
   System.exit(-1);
  }

  // Create the JobConf instance and specify the job name
  JobConf conf = new JobConf(AirlineDelayByStartTime.class);
  conf.setJobName("Airline Delay");

  // First and second arguments are input and output folder
  FileInputFormat.addInputPath(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  // Specify the mapper and the reducer class
  conf.setMapperClass(AirlineDelayByStartTimeMapper.class);
  conf.setReducerClass(AirlineDelayByStartTimeReducer.class);

  // Specify the output key and value of the entire job
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);

  // Specify the number of reducers tasks to run at any instant on a
  // machine, defaults to one
  conf.setNumReduceTasks(4);

  // Trigger the mapreduce program
  JobClient.runJob(conf);
 }
}

Here is the Mapper Code.
package AirlineDelaySinglePass;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class AirlineDelayByStartTimeMapper extends MapReduceBase implements
  Mapper<LongWritable, Text, Text, IntWritable> {

 public void map(LongWritable key, Text value,
   OutputCollector<Text, IntWritable> output, Reporter reporter)
   throws IOException {

  // Split the input line based on comma
  String[] pieces = value.toString().split(",");

  // Delayed 0 for ontime or NA, 1 for delayed
  int delayed = 0;

  // Get the origin which is the 17 field in the input line
  String origin = pieces[16];

  if (StringUtils.isNumeric(pieces[4])
    && StringUtils.isNumeric(pieces[5])) {

   // 5 DepTime actual departure time (local, hhmm)
   // 6 CRSDepTime scheduled departure time (local, hhmm)
   int actualDepTime = Integer.parseInt(pieces[4]);
   int scheduledDepTime = Integer.parseInt(pieces[5]);

   // if the flight has been delated
   if (actualDepTime > scheduledDepTime) {
    delayed = 1;
   }

  }

  // Send the Origin and the delayed status to the reducer for aggregation
  // ex., (ORD, 1)
  output.collect(new Text(origin), new IntWritable(delayed));

 }
}

Here is the Reducer Code
package AirlineDelaySinglePass;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class AirlineDelayByStartTimeReducer extends MapReduceBase implements
  Reducer<Text, IntWritable, Text, IntWritable> {

 private IntWritable value = new IntWritable();

 public void reduce(Text key, Iterator<IntWritable> values,
   OutputCollector<Text, IntWritable> output, Reporter reporter)
   throws IOException {

  // initialize the total flights and delayed flights
  int totalFlights = 0;
  int delayedFlights = 0;

  Text newKey = new Text();

  while (values.hasNext()) {

   // Delayed 0 for ontime or NA, 1 for delayed
   int delayed = values.next().get();

   // Increment the totalFlights by 1
   totalFlights = totalFlights + 1;

   // Calculate the number of delayed flights
   delayedFlights = delayedFlights + delayed;

  }

  // Create the key ex., ORD\t123
  newKey.set(key.toString() + "\t" + delayedFlights);

  // Create the value ex., 150
  value.set(totalFlights);

  // Pass the key and the value to Hadoop to write it to the final output
  output.collect(newKey, value);
 }
}

Here is the output of the MR program. This data can be joined with the Airport details (lat, long, name etc) to make it more readable and interesting. I could have written another MR program to do the join, but the size of the data set is so small that it doesn't make sense to use another MR program. We can simply use a spread sheet program or a database for joining of the data sets depending on the level of comfort.

Here is the time taken to run on my single node server. Note that the size of the airline data set is close to 12 GB and the processing is simple.


In the next blog I will post the code for doing the same with Spark.

Friday, March 25, 2016

Flight departure delays

The airline dataset is one of the interesting dataset which I came across recently. For 20 (1987 to 2008) years it has the actual/scheduled arrival/departure code, carrier code, flight number and a lot of other details.

From this information we can glean some interesting information like what is the best airline to travel, best time to travel. We can also mash with the weather data set to find out what sort of weather conditions cause flights to get delayed.

In this post I will publish what are the best and worst airport based on the number of delayed flights by departure. Chicago O'Hare International has the maximum number of departure delays (31,21,184). May be the total flights from O'Hare is huge, so I calculated the percentage (49.52%) of the number of departure delays (31,21,184) with the total number of flights (63,02,546) taking of from O'Hare. There are lot of factors influencing the flight delays, but delays are delays and O'Hare is one of the worst airport with respect to delays.



I have used MR programs for figuring out the same, which I would be publishing in the next blog after cleansing them up. For those interested to find out how the airport which is closet to them compares to other airports here is the data.