Monday, April 11, 2016

Analyzing the airline dataset with Spark/Python

The airline dataset in the previous blogs has been analyzed in MR and Hive, In this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, almost same as the earlier MR program. Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.


The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.
from pyspark import SparkConf
from pyspark import SparkContext

import csv

def extractData(record) :

    #split the input record by comma
    splits = record.split(',')

    #extract the actual/scheduled departure time and the origin
    actualDepTime = splits[4]
    scheduledDepTime = splits[5]
    origin = splits[16]

    #1 delayed
    #0 don't know or not delayed
    delayed = 0

    # Check if the actual/scheduled departure time is a digit
    if actualDepTime.isdigit() & scheduledDepTime.isdigit():

        #if the flight got delayed or not
        if int(actualDepTime) > int(scheduledDepTime) :
            delayed = 1

    #return the origin and delayed status as a tuple
    return origin, delayed

#create the SparkConf() instance on which the different configurations can be done
conf = SparkConf().setMaster("spark://bigdata-server:7077")

#establish a connection to Spark
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")

#load the input data
lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")

#figure out the delayed flights and cache the data as it is being processed twice later
delayedFlights = lines.map(extractData).cache()

#get the delayed flights
delayedFlights.reduceByKey(lambda a, b : a + b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")

#get the total flights
totalFlights = delayedFlights.countByKey()

#totalFlights is dictionary. Iterate the same and write to a file
w = csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))
for key, val in totalFlights.items():
    w.writerow([key, val])
As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.


Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable.
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON
delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")
Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.

 
Here is the output back in HDFS.


Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.


In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.

No comments:

Post a Comment