Anagram Grouping using MapReduce

Anagram Grouping :  Input files contain n number of varying string , we need to identify anagrams words and group them together.

Problem Statement : Find anagram words from input files , group them together and convert each word into upper case.

Solution : For solving this problem , i am going to use ChainMapper and ChainReducer. All configuration is done in ChainRunner class. Flow of program is mentioned below:

SortKeyMapper -->CombineKeyReducer-->UpperCaseMapper

SortKeyMapper  : This will read input file line by line.It will create a token from line and sort each toke to form a key with original value.In this way hadoop will create a group of similar words.
For example : aba and baa will get sorted as aab. So reducer will receive a key as aab with group {aba,baa}

 CombineKeyReducer : It will read each member of group and combine them into single string separated by tab.

UpperCaseMapper : It will convert each word into upper case and final output will be written into HDFS.


Sample Input :
aba
def
eikdsla
baa
dslakei
sample
emaslp
sampel

Expected Output:
ABA BAA
EIKDSLA DSLAKEI
SAMPLE    EMASLP SAMPEL
DEF

Here is a code:

ChainRunner Class :
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.mapred.TextOutputFormat;

public class ChainRunner {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
JobConf conf = new JobConf();
System.out.println("Execution Started");
conf.setJobName("Chain Example");
conf.setJarByClass(ChainRunner.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));

JobConf conf1 = new JobConf(false);
ChainMapper.addMapper(conf, SortKeyMapper.class, LongWritable.class, Text.class, Text.class, Text.class, true, conf1);
JobConf redConf =   new JobConf();
ChainReducer.setReducer(conf, CombineKeyReducer.class, Text.class, Text.class, Text.class, IntWritable.class, true, redConf);
JobConf mapConf =   new JobConf();
ChainReducer.addMapper(conf, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, NullWritable.class, true, mapConf);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(NullWritable.class);
conf.setOutputFormat((Class<? extends OutputFormat>) TextOutputFormat.class);
JobClient.runJob(conf);
}

}

SortKeyMapper Class

import java.io.IOException;
import java.util.Arrays;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class SortKeyMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,OutputCollector<Text,Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer  tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()){
String originalInput = tokenizer.nextToken();
String sortedKey = sortStringChar(originalInput);
System.out.println("Original Input" + originalInput + "Sorted Input" + sortedKey);
output.collect(new Text(sortedKey), new Text(originalInput));
}
}

private String sortStringChar(String string) {
char[] chars = string.toCharArray();
Arrays.sort(chars);
String sortedString = new String (chars);
System.out.println("Sorted String" + sortedString);
return sortedString;
}

}

CombineKeyReducer  Class:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class CombineKeyReducer extends MapReduceBase implements Reducer<Text,Text,Text,IntWritable> {

public void reduce(Text key, Iterator<Text> values,OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
int sum=1;
String similarWord = "";
while (values.hasNext()) {
similarWord = similarWord + "\t" + values.next().toString();
System.out.println( similarWord.toString());
}
output.collect(new Text( similarWord)  ,new IntWritable(sum));
}

}

UpperCaseMapper  Class :

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class UpperCaseMapper extends MapReduceBase implements Mapper<Text,IntWritable,Text,NullWritable> {
private Text word = new Text();

public void map(Text key, IntWritable value,OutputCollector<Text,NullWritable> output, Reporter reporter) throws IOException {
String line = key.toString();
output.collect( new Text(line.toUpperCase()), NullWritable.get());
}

}

Output:



Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi