In the previous blog I introduced the Airline data set. I plan to get the results (total and delayed flights from different airports) using different Big Data softwares like Hadoop(MR), Hive, Pig, Spark, Impala etc and also with different formats of the data like Avro and Parquet. This will help the followers of the blog appreciate the different distributing computing software and models.
As usual there can be more than one of performing a particular task and a more efficient way. I have written the programs in a short span of time and will try to optimize them over time. Also, I will include as many comments as possible in the code, so as to make the code self explanatory.
Recently I have assembled a high configuration computer, which I like to call as a server :) with the below configurations. Ubuntu 14.04 64 bit has been installed on it. All the processing would be done on this server. It was fun looking for the right parts and assembling the computer. Initially I planned to buy an SSD instead of a Hard Disk, but the SSD were too expensive beyond the budget I planned.
Processor - AMD FX-6300 (FD6300WMHKBOX)
Mother Board - GIGABYTE GA-990XA-UD3
RAM - G.skill 2 * GB F3-12800CL10D-16GBXL
Hard Disk - Seagate 3.5 Inches Desktop 2 TB Hard Drive (ST2000DX001)
So, here is the MR program for calculating the total flight departures and the number of flight delays for the 20 years for each of the airport. Initially I wrote two MR programs, one for calculating the total flights and another for calculating the total delayed flights. This approach is not really efficient as the input data is parsed twice. So, I revamped the the program into one MR program.
Here is the MR Driver Code.
Here is the Mapper Code.
Here is the Reducer Code
Here is the output of the MR program. This data can be joined with the Airport details (lat, long, name etc) to make it more readable and interesting. I could have written another MR program to do the join, but the size of the data set is so small that it doesn't make sense to use another MR program. We can simply use a spread sheet program or a database for joining of the data sets depending on the level of comfort.
Here is the time taken to run on my single node server. Note that the size of the airline data set is close to 12 GB and the processing is simple.
In the next blog I will post the code for doing the same with Spark.
As usual there can be more than one of performing a particular task and a more efficient way. I have written the programs in a short span of time and will try to optimize them over time. Also, I will include as many comments as possible in the code, so as to make the code self explanatory.
Recently I have assembled a high configuration computer, which I like to call as a server :) with the below configurations. Ubuntu 14.04 64 bit has been installed on it. All the processing would be done on this server. It was fun looking for the right parts and assembling the computer. Initially I planned to buy an SSD instead of a Hard Disk, but the SSD were too expensive beyond the budget I planned.
Processor - AMD FX-6300 (FD6300WMHKBOX)
Mother Board - GIGABYTE GA-990XA-UD3
RAM - G.skill 2 * GB F3-12800CL10D-16GBXL
Hard Disk - Seagate 3.5 Inches Desktop 2 TB Hard Drive (ST2000DX001)
So, here is the MR program for calculating the total flight departures and the number of flight delays for the 20 years for each of the airport. Initially I wrote two MR programs, one for calculating the total flights and another for calculating the total delayed flights. This approach is not really efficient as the input data is parsed twice. So, I revamped the the program into one MR program.
Here is the MR Driver Code.
package AirlineDelaySinglePass; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; public class AirlineDelayByStartTime { public static void main(String[] args) throws IOException { // Check if the number of parameters passed to program is 2 if (args.length != 2) { System.err .println("Usage: AirlineDelayByStartTime <input path> <output path>"); System.exit(-1); } // Create the JobConf instance and specify the job name JobConf conf = new JobConf(AirlineDelayByStartTime.class); conf.setJobName("Airline Delay"); // First and second arguments are input and output folder FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); // Specify the mapper and the reducer class conf.setMapperClass(AirlineDelayByStartTimeMapper.class); conf.setReducerClass(AirlineDelayByStartTimeReducer.class); // Specify the output key and value of the entire job conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); // Specify the number of reducers tasks to run at any instant on a // machine, defaults to one conf.setNumReduceTasks(4); // Trigger the mapreduce program JobClient.runJob(conf); } }
Here is the Mapper Code.
package AirlineDelaySinglePass; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class AirlineDelayByStartTimeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Split the input line based on comma String[] pieces = value.toString().split(","); // Delayed 0 for ontime or NA, 1 for delayed int delayed = 0; // Get the origin which is the 17 field in the input line String origin = pieces[16]; if (StringUtils.isNumeric(pieces[4]) && StringUtils.isNumeric(pieces[5])) { // 5 DepTime actual departure time (local, hhmm) // 6 CRSDepTime scheduled departure time (local, hhmm) int actualDepTime = Integer.parseInt(pieces[4]); int scheduledDepTime = Integer.parseInt(pieces[5]); // if the flight has been delated if (actualDepTime > scheduledDepTime) { delayed = 1; } } // Send the Origin and the delayed status to the reducer for aggregation // ex., (ORD, 1) output.collect(new Text(origin), new IntWritable(delayed)); } }
Here is the Reducer Code
package AirlineDelaySinglePass; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class AirlineDelayByStartTimeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable value = new IntWritable(); public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // initialize the total flights and delayed flights int totalFlights = 0; int delayedFlights = 0; Text newKey = new Text(); while (values.hasNext()) { // Delayed 0 for ontime or NA, 1 for delayed int delayed = values.next().get(); // Increment the totalFlights by 1 totalFlights = totalFlights + 1; // Calculate the number of delayed flights delayedFlights = delayedFlights + delayed; } // Create the key ex., ORD\t123 newKey.set(key.toString() + "\t" + delayedFlights); // Create the value ex., 150 value.set(totalFlights); // Pass the key and the value to Hadoop to write it to the final output output.collect(newKey, value); } }
Here is the output of the MR program. This data can be joined with the Airport details (lat, long, name etc) to make it more readable and interesting. I could have written another MR program to do the join, but the size of the data set is so small that it doesn't make sense to use another MR program. We can simply use a spread sheet program or a database for joining of the data sets depending on the level of comfort.
Here is the time taken to run on my single node server. Note that the size of the airline data set is close to 12 GB and the processing is simple.
In the next blog I will post the code for doing the same with Spark.
hi, may i get the input dataset for this above program
ReplyDeleteIf you go through the blog closely, I already mentioned it - http://stat-computing.org/dataexpo/2009/
DeleteCould you please explain how many times the reducer is executed(is it equal to the number of key-value pairs) and also how the while loop is executed.
ReplyDeleteIf possible use one example.