Monday, April 18, 2016

PageRank on the English wiki data using MapReduce

PageRank is an iterative processing to find the relevancy of a web page in the world-wide-web. PageRank is one of the signal used by the search engine to figure out what to show at the top and what at the bottom of the search results. The `Data-Intensive Text Processing with MapReduce` has a very good description of what PageRank is and how to solve it in a MapReduce way.

As mentioned above PageRank is an iterative algorithm and MapReduce model is not good for iterative processing as the input and output of MapReduce in disk (from HDFS) which is really slow when compared to reading the data from the memory as in the case of Spark.

Anyway, working MapReduce code was available for the PageRank algorithm to process the Wiki data and so I thought of giving it shot. Here is the code and the description for the same. While iterating, if the difference between two iterations is small then the iterations are terminated. This logic is not implemented in the code.

Here is the data (english wiki) for the same. For those who don't have enough resources then a small wiki dump is also available for other languages.

It took 7 minutes short of 3 hours to process 52.58 GB of enhlish wiki data. The processing included a total of 7 MR programs. The first one for parsing the wiki xml data, 5 iterations using MR to calculate the PageRank and the final MR program for ordering the wiki pages based on the ranking.

Here is the configuration of the machine on which the wiki data has been processed.

Here is the folder structure in HDFS.


Here is the size of the input data.


Here is the size of the data after the first MR program (parsing the wiki). Not that the size of the data got decreased significantly, as we only need the list of web pages, the web pages it has connected to and the initial page ranking to start the PageRank algorithm.


Here is the size of the final result, which is significantly small as only the webpage and the corresponding PageRank.


The file is a bit huge to open in gedit. So, head/tail is another way to look at the end of the file. Here is the screen for the same. Proud to see India in the top 10 list.

 
Here is the screenshot of the MapReduce console with the 7 MapReduce programs.


Here are the details of the first MapReduce program (xml parsing).


 Here are the details of the MapReduce program implementing a single MapReduce iteration for calculating the PageRank.


Here are the details of the MapReduce program implementing the sorting based on the PageRank.


The rational behind publishing so many screen shots is for the readers to get the different metrics for the processing of the English wiki data. Your mileage might very depending on the configuration of the machine and the size of the input wiki data used for the processing.

In the coming blogs, we will look into how to do the same think with Spark.

Monday, April 11, 2016

Analyzing the airline dataset with Spark/Python

The airline dataset in the previous blogs has been analyzed in MR and Hive, In this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, almost same as the earlier MR program. Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.


The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.
from pyspark import SparkConf
from pyspark import SparkContext

import csv

def extractData(record) :

    #split the input record by comma
    splits = record.split(',')

    #extract the actual/scheduled departure time and the origin
    actualDepTime = splits[4]
    scheduledDepTime = splits[5]
    origin = splits[16]

    #1 delayed
    #0 don't know or not delayed
    delayed = 0

    # Check if the actual/scheduled departure time is a digit
    if actualDepTime.isdigit() & scheduledDepTime.isdigit():

        #if the flight got delayed or not
        if int(actualDepTime) > int(scheduledDepTime) :
            delayed = 1

    #return the origin and delayed status as a tuple
    return origin, delayed

#create the SparkConf() instance on which the different configurations can be done
conf = SparkConf().setMaster("spark://bigdata-server:7077")

#establish a connection to Spark
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")

#load the input data
lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")

#figure out the delayed flights and cache the data as it is being processed twice later
delayedFlights = lines.map(extractData).cache()

#get the delayed flights
delayedFlights.reduceByKey(lambda a, b : a + b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")

#get the total flights
totalFlights = delayedFlights.countByKey()

#totalFlights is dictionary. Iterate the same and write to a file
w = csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))
for key, val in totalFlights.items():
    w.writerow([key, val])
As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.


Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable.
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON
delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")
Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.

 
Here is the output back in HDFS.


Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.


In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.