Monday, October 31, 2016

Maximum temperature for year using Spark SQL

In the previous blog, we looked at how find out the maximum temperature of each year from the weather dataset. Below is the code for the same using Spark SQL which is a layer on top of Spark. SQL on Spark was supported using Shark which is being replaced by Spark SQL. Here is a nice blog from DataBricks on the future of SQL on Spark.
import re
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

#function to extract the data from the line
#based on position and filter out the invalid records
def extractData(line):
    val = line.strip()
    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
    if (temp != "+9999" and re.match("[01459]", q)):
        return [(year, temp)]
    else:
        return []

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

#Create Spark Context and SQL Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
sqlContext = SQLContext(sc)

#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)

#Transform the data to extract/filter and then map it to a row
temperature_data = weatherData.flatMap(extractData).map(lambda p: Row(year=p[0], temperature=int(p[1])))

#Infer the schema, and register the SchemaRDD as a table.
temperature_data = sqlContext.inferSchema(temperature_data)
temperature_data.registerTempTable("temperature_data")

#SQL can be run over SchemaRDDs that have been registered as a table.
#Filtering can be done in the SQL using a where clause or in a py function as done in the extractData()
max_temperature_per_year = sqlContext.sql("SELECT year, MAX(temperature) FROM temperature_data GROUP BY year")

#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

Thursday, October 27, 2016

Maximum temperature for year using Spark/Python

Hadoop – The Definitive Guide revolves around the example of finding the maximum temperature for a particular year from the weather data set. The code for the same is here and the data here. Below is the Spark code implemented in Python for the same.
import re
import sys

from pyspark import SparkContext

#function to extract the data from the line
#based on position and filter out the invalid records
def extractData(line):
    val = line.strip()
    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
    if (temp != "+9999" and re.match("[01459]", q)):
        return [(year, temp)]
    else:
        return []

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

#Create Spark Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")

#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)

#Transform the data to extract/filter and then find the max temperature
max_temperature_per_year = weatherData.flatMap(extractData).reduceByKey(lambda a,b : a if int(a) > int(b) else b)

#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
The map function during the transformation is similar to the map function in the MR model and reduceByKey transformation is similar to the reduce function in the MR model.

In the future blogs, we will see how to perform complex processing using the other transformations and actions provided by Spark.

Monday, October 24, 2016

Hadoop/MR vs Spark/RDD WordCount program

Apache Spark provides an efficient way for solving iterative algorithms by keeping the intermediate data in the memory. This avoids the overhead of R/W of the intermediate data from the disk as in the case of MR.

Also, when running the same operation again and again, data can be cached/fetched from the memory without performing the same operation again. MR is stateless, lets say a program/application in MR has been executed 10 times, then the whole data set has to be scanned 10 times.
Also, namesake MR supports only Map and Reduce operations and everything (join, groupby etc) has to be fit into the Map and Reduce model, which might not be the efficient way. Spark supports a couple of other transformations and actions besides just Map and Reduce as mentioned here and here.

Spark code is also compact when compared to the MR code. Below is the program for performing the WordCount using Python in Spark.
from pyspark import SparkContext

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

sc = SparkContext("spark://bigdata-vm:7077", "WordCount")

textFile = sc.textFile(logFile)

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

wordCounts.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
and the same in MR model using Hadoop is a bit verbose as shown below.
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class TokenizerMapper extends
        Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<Intwritable> values,
        Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Here we looked into WordCount which is similar to HelloWorld program in terms of simplicity, for one to get started with a new concept/technology/language. In the future blogs, we will look into a little more advanced features in Spark.

Thursday, October 20, 2016

Intersection of Big Data and IOT

Lately I had been blogging about IOT and Big Data. They go hand-in-hand. With the IOT devices feeding the data to the Big Data platforms on the Cloud to store/analyze the data and feed it back to the IOT devices. One such example is the Nest device bought by Google. Nest thermostat gathers the room temperature, sends it over to the Cloud over Wi-Fi, analytics done in the Cloud and fed back to the Nest again.

This is not something new, but had been there for quite some time. But, the conditions are moving towards more adoption of Big Data and IOT like cheap sensors, Cloud, cheaper and faster internet connections. Here is an article  from ZDNet on ten practical examples on the intersection of Big Data and IOT. For those who are interested in learning more, here (1, 2) are few more references from ZDNet.

I am yet to try it out, but here is a detailed article from IBM Developer Works on building a temperature sensor using Arduino Uno, putting the data in the Cloud and finally visualizing the data in a real time. Here is another article on how to integrate IOT with Azure.


Monday, October 17, 2016

Getting started with Apache Spark

With so much noise around Apache Spark, let's look into how to get started with Spark in local mode and execute a simple Scala program. A lot of complex combinations are possible, but we will look at the minimum steps required to get started with Spark.

Most of the Big Data softwares are developed with Linux as the platform and porting to Windows has been an after thought. It is interesting to see how Big Data on Windows will morph in the future. Spark can run on both Windows/Linux, but we will take Linux (Ubuntu 14.04 64-bit Desktop) into consideration. So, here are the steps:

1) Download and install Oracle VirtualBox as mentioned here.

2) Download and install Ubuntu as mentioned here as a guest OS.

3) Update the patches on Ubuntu from a terminal and reboot it.
sudo apt update;sudo apt-get dist-upgrade
4) Oracle Java doesn't come with Linux distributions, so has to be installed manually on top of Ubuntu as mentioned here.

5) Spark has been developed in Scala, so we need install Scala.
sudo apt-get install scala
6) Download Spark from here and extract it. Spark built with hadoop 1x or 2x will work, because HDFS is not being used in this context.

7) From the Spark installation folder start the Spark shell.
bin/spark-shell
8) Execute the below commands in the shell to load the README.md and count the number of lines in it.
val textFile = sc.textFile("README.md")

textFile.count()
What we have done is install Ubuntu as a guest OS and then install Spark on it. And finally run a simple Scala program in Spark local mode. There are much more advanced setups like running Spark program against data in HDFS, running Spark in stand alone, Mesos and YARN mode. We will look at them in the future blogs.

Wednesday, October 12, 2016

ASF (Apache Software Foundation) as a standards body

What is ASF all about?

For many Apache is synonymous to the Apache HTTP server, which is the backbone for serving the web pages. But, there is much more to Apache. It's a non profit organization (ASF - Apache Software Foundation) which provides an environment and a platform in which different companies and individuals work in an open and collaborative fashion towards a common goal of developing a good piece of software. Open means that all the work (architecture, design, coding, testing, documentation etc) happens in an open way and there are no secrets. Anyone can also download the code, make some changes, compile it and push it back.

It's possible for different companies and individuals like you and me to improve the code and contribute it back to the ASF. To maintain the quality of the software there is a process in place where the project committer will check the quality of the code contributed by someone and then add them to the code repository. The advantage of working in this model is that any improvements made by an individual or a company can be immediately absorbed by someone else. This is what working in a collaborative fashion means.

There are a lot of Big Data projects under the ASF like Hadoop, Hive, Pig, HBase, Cassandra and a lot of non Big Data projects like Tomcat, Log4J, Velocity, Struts. The projects usually start with Incubator status and then some of them move to the TLP status (Top Level Project). The code for the different projects can be accessed in a read only way from here.


How is Apache promoting standards?

OK, now we know how the Apache process works. The different companies and individuals work towards the common goal of creating good software. Now, lets look into why standards are important in software and how Apache is promoting the standards.

Those who travel internationally and carry at least one electronic item, face the problem with the different socket layout and the voltages in different countries. The plugs just don't fit in into the sockets and so is the need to carry multiple adapters. If there had been an international standard for the socket layout and the voltage, we wouldn't face this problem.

The same can be applied to software standards also. Software standards allow interoperability across different software stacks. As an example, a program written against one software can be easily ported to some other software if the standards are followed. One example is the JEE standards. An EJB written for JBoss can be easily ported to WebSphere with minimal or no changes.

In the case of Big Data stacks, the different Big Data companies take the software from the Apache Software foundation and improve on it. Some of the improvements can be better documentation,  better performance, bug fixes, better usability in terms of installation/monitoring/alerting.

The Apache code is the common base for the different Big Data distributions. Due to this reason the Apache code base and the different distributions like HDP, CDH etc provide more or less the same API to program against. This way Apache is acting as a standards body. For example a MapReduce program written against HDP distribution can be run against the other distributions with minimal or no changes.

Although Apache is not a standards body, it is still acting as one. Usually, a standards body is formed and they do develop standards and a reference implementation for the same. This is a painstaking process and some of the standards may not see the light of the day also. The advantage of working in the Apache fashion is that the standards are developed indirectly in a very quick fashion.

Monday, October 10, 2016

What is Dockers all about?

There had been a lot of noise about Docker. And there had been a raft of announcements (1, 2, 3) about the support for Docker in their products from different  companies. In this blog, we will look what Docker is all about, but before that we will look into what virtualization, LXC. In fact, Docker-Big Data integration also can be done.

What is Virtualization?

Virtualization allows to run multiple operating systems on a single machine. For this to happen virtualization software like Xen, KVM, HyperV, VMWare vSphere, VirtualBox has to be installed. Oracle VirtualBox is free and easy to setup. Once VirtualBox has been installed, multiple guest OS can be run on top of VirtualBox as shown below.
On top of the guest OS, applications can be installed. The main advantage of the above configuration is that the applications are isolated from each other. And also, resources can be allocated to the guest OS and a single application can’t dominate and use the underlying hardware resources completely and let the other applications starve for resources. The main disadvantage is each of the guest OS gets it’s own kernel and the file system and hence consume a lot of resources.

What is LXC?

LXC (Linux Containers) provide OS level virtualization and don’t need a complete OS to be installed as mentioned in the case of virtualization.
The main advantage of LXC is that they are light weight and there is little over head of running the applications on top of LXC instead of directly on top of the host OS. And also, LXC provides isolation between the different applications deployed in them and resources can also be allocated them.

LXC can be thought of light weight machines which consume less resources, are easy to start and on which applications can be deployed.

How does Docker fit into the entire thing?

LXCs are all interesting, but it’s not that easy to migrate an application on top of LXC from one environment to another (let’s say from development to QA to production) as there are a lot of dependencies between the application and the the underlying system.

Docker provides a platform, so that applications can be be built and packaged in a standardized way. The application built, will also include all the dependencies and so there is less friction moving from one environment to another environment. Also, Docker will try to abstract the resources required for the application, so that that application can run in different environments without any changes.

For those from a Java/JEE background, Docker applications can be considered similar to an EAR file which can be migrated from one environment to another without any changes as long as the proper standards are followed while creating the EAR file. The Java applications can make JNDI calls to discover services by name and so are not tied to a particular resource.

BTW, here is a nice documentation on Docker.