The airline dataset in the previous blogs has been analyzed in MR and Hive, In this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.
It took 5 min 30 sec for the processing, almost same as the earlier MR program. Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (1, 2) of the Python program.
The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.
Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable.
It took 5 min 30 sec for the processing, almost same as the earlier MR program. Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (1, 2) of the Python program.
from pyspark import SparkConf from pyspark import SparkContext import csv def extractData(record) : #split the input record by comma splits = record.split(',') #extract the actual/scheduled departure time and the origin actualDepTime = splits[4] scheduledDepTime = splits[5] origin = splits[16] #1 delayed #0 don't know or not delayed delayed = 0 # Check if the actual/scheduled departure time is a digit if actualDepTime.isdigit() & scheduledDepTime.isdigit(): #if the flight got delayed or not if int(actualDepTime) > int(scheduledDepTime) : delayed = 1 #return the origin and delayed status as a tuple return origin, delayed #create the SparkConf() instance on which the different configurations can be done conf = SparkConf().setMaster("spark://bigdata-server:7077") #establish a connection to Spark sc = SparkContext(conf = conf, appName = "flightDataAnalysis") #load the input data lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input") #figure out the delayed flights and cache the data as it is being processed twice later delayedFlights = lines.map(extractData).cache() #get the delayed flights delayedFlights.reduceByKey(lambda a, b : a + b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights") #get the total flights totalFlights = delayedFlights.countByKey() #totalFlights is dictionary. Iterate the same and write to a file w = csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w")) for key, val in totalFlights.items(): w.writerow([key, val])As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SQLContext conf = SparkConf().setMaster("spark://bigdata-server:7077") sc = SparkContext(conf = conf, appName = "flightDataAnalysis") sqlContext = SQLContext(sc) # load the parquet data df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet") # find the number of delayed flights per origin delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count() # Stored the delayed count as JSON delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.
Here is the output back in HDFS.
Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.
In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.
No comments:
Post a Comment