Monday, October 31, 2016

Maximum temperature for year using Spark SQL

In the previous blog, we looked at how find out the maximum temperature of each year from the weather dataset. Below is the code for the same using Spark SQL which is a layer on top of Spark. SQL on Spark was supported using Shark which is being replaced by Spark SQL. Here is a nice blog from DataBricks on the future of SQL on Spark.
import re
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

#function to extract the data from the line
#based on position and filter out the invalid records
def extractData(line):
    val = line.strip()
    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
    if (temp != "+9999" and re.match("[01459]", q)):
        return [(year, temp)]
    else:
        return []

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

#Create Spark Context and SQL Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
sqlContext = SQLContext(sc)

#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)

#Transform the data to extract/filter and then map it to a row
temperature_data = weatherData.flatMap(extractData).map(lambda p: Row(year=p[0], temperature=int(p[1])))

#Infer the schema, and register the SchemaRDD as a table.
temperature_data = sqlContext.inferSchema(temperature_data)
temperature_data.registerTempTable("temperature_data")

#SQL can be run over SchemaRDDs that have been registered as a table.
#Filtering can be done in the SQL using a where clause or in a py function as done in the extractData()
max_temperature_per_year = sqlContext.sql("SELECT year, MAX(temperature) FROM temperature_data GROUP BY year")

#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

No comments:

Post a Comment