Hadoop - Custom Partitioner implementation using wordcount.

Here is an simple implementation of partitioner class using standard word count example. In below word count example , integer values will be send to first reducer and text values will be send to second reducer using custom partitioner.

Sample Input to Program:

1 2 3 text sample file
3 2 4 timepass testing
text file sample 1

Expected Output:

part-00000
1 2
2 2
3 2
4 1

part-00001
file 2
sample 2
testing 1
text 2
timepass 1

Runner Class:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class wc_runner {
public static void main(String[] args) throws IOException{
JobConf conf = new JobConf(wc_runner.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(2);
conf.setMapperClass(wc_mapper.class);
conf.setPartitionerClass( wc_partitioner.class);
conf.setReducerClass(wc_reducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}

Mapper Class:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class wc_mapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer  tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

Partitioner Class:
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
public class wc_partitioner implements Partitioner{
@Override
public int getPartition(Object key, Object value, int arg2) {
// TODO Auto-generated method stub
if( isNumeric(key.toString()))
{
return 0;
}
else
{
return 1%arg2;
}
}
public static boolean isNumeric(String str)  
{  
 try  
 {  
   double d = Double.parseDouble(str);  
 }  
 catch(NumberFormatException nfe)  
 {  
   return false;  
 }  
 return true;  
}
@Override
public void configure(JobConf arg0) {
// TODO Auto-generated method stub
}

}

Reducer Class:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class wc_reducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
int sum=0;
while (values.hasNext()) {
sum+=values.next().get();
}
output.collect(key,new IntWritable(sum));
}

}

Create a  jar file and execute using following command:

hadoop jar WordCount.jar  wc_runner /Practice/Input.txt /Practice/PracticeOutput

Here i have placed Input.txt into my HDFS directory. You give your own input and output path to run this command.


Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi