java - Hadoop WordCount sorted by word occurrences -
i need run wordcount give me words , occurrences sorted occurrences , not alphabet
i understand need create 2 jobs , run 1 after other used mapper , reducer sorted word count using hadoop mapreduce
package org.myorg; import java.io.ioexception; import java.util.*; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.job; public class wordcount { public static class map extends mapreducebase implements mapper<longwritable, text, text, intwritable> { private final static intwritable 1 = new intwritable(1); private text word = new text(); public void map(longwritable key, text value, outputcollector<text, intwritable> output, reporter reporter) throws ioexception { string line = value.tostring(); stringtokenizer tokenizer = new stringtokenizer(line); while (tokenizer.hasmoretokens()) { word.set(tokenizer.nexttoken()); output.collect(word, one); } } } public static class reduce extends mapreducebase implements reducer<text, intwritable, text, intwritable> { public void reduce(text key, iterator<intwritable> values, outputcollector<text, intwritable> output, reporter reporter) throws ioexception { int sum = 0; while (values.hasnext()) { sum += values.next().get(); } output.collect(key, new intwritable(sum)); } } class map1 extends mapreducebase implements mapper<object, text, intwritable, text> { public void map(object key, text value, outputcollector<intwritable, text> collector, reporter arg3) throws ioexception { string line = value.tostring(); stringtokenizer stringtokenizer = new stringtokenizer(line); { int number = 999; string word = "empty"; if (stringtokenizer.hasmoretokens()) { string str0 = stringtokenizer.nexttoken(); word = str0.trim(); } if (stringtokenizer.hasmoreelements()) { string str1 = stringtokenizer.nexttoken(); number = integer.parseint(str1.trim()); } collector.collect(new intwritable(number), new text(word)); } } } class reduce1 extends mapreducebase implements reducer<intwritable, text, intwritable, text> { public void reduce(intwritable key, iterator<text> values, outputcollector<intwritable, text> arg2, reporter arg3) throws ioexception { while ((values.hasnext())) { arg2.collect(key, values.next()); } } } public static void main(string[] args) throws exception { jobconf conf = new jobconf(wordcount.class); conf.setjobname("wordcount"); conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(intwritable.class); conf.setmapperclass(map.class); conf.setcombinerclass(reduce.class); conf.setreducerclass(reduce.class); conf.setinputformat(textinputformat.class); conf.setoutputformat(textoutputformat.class); fileinputformat.setinputpaths(conf, new path(args[0])); fileoutputformat.setoutputpath(conf, new path("/tmp/temp")); //jobclient.runjob(conf); //------------------------------------------------------------------ jobconf conf2 = new jobconf(wordcount.class); conf2.setjobname("wordcount1"); conf2.setoutputkeyclass(text.class); conf2.setoutputvalueclass(intwritable.class); conf2.setmapperclass(map1.class); conf2.setcombinerclass(reduce1.class); conf2.setreducerclass(reduce1.class); conf2.setinputformat(textinputformat.class); conf2.setoutputformat(textoutputformat.class); fileinputformat.setinputpaths(conf2, new path("/tmp/temp/part-00000")); fileoutputformat.setoutputpath(conf2, new path(args[1])); job job1 = new job(conf); job job2 = new job(conf2); job1.submit(); if (job1.waitforcompletion(true)) { job2.submit(); job1.waitforcompletion(true); } } }
it's not working, should change here, or why it's not working ???
if program runs until:
info input.fileinputformat: total input paths process : 1
then problem lies in last line:
job2.submit();
the job has been submitted not queued processed. try this:
job1.submit(); if (job1.waitforcompletion(true)) { job2.submit(); job2.waitforcompletion(true); }
to process sorter mr job. i've tried code new api mr , flow works.
just add last line.
Comments
Post a Comment