Friday, December 30, 2011

WhatsWrong : Accumulating values sent to mapper

The below program accumulates the values sent to the mapper in valueBuff of type ArrayList<Text> (at line 21) and prints the ArrayList in the AccumulateKVMapper#cleanup method (at line 28).

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AccumulateKV {
    static class AccumulateKVMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {

        public ArrayList<Text> valueBuff = new ArrayList<Text>();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            valueBuff.add(value);
            context.write(key, value);
        }

        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            for (Text o : valueBuff) {
                System.out.println("value = " + o.toString());
            }
        }

    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err
                    .println("Usage: AccumulateKV <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(AccumulateKV.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(AccumulateKVMapper.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The input to the job is

Berlin-Germany
New Delhi-India
Kuala Lampur-Malaysia
Pretoria-Sri Lanka

The expected output is

value = Berlin-Germany
value = New Delhi-India
value = Kuala Lampur-Malaysia 
value = Pretoria-Sri Lanka

But, the output of the program is

value =
value =
value = 
value =

What's wrong with the above program? I have tried it against 0.22 release in Local (Standalone) Mode, but it should behave the same with other releases and also in Pseudo-Distributed and Fully-Distributed Mode.

Respond back in the comments and I will give a detailed explanation once I get a proper response.

No comments:

Post a Comment