Distributed Cache in Hadoop

Side Data Distribution : Side data can be defined as extra read-only data needed by a job to process the main dataset. The challenge is to make side data available to all the map or reduce tasks
(which are spread across the cluster) in a convenient and efficient fashion.

Distributed Cache : It provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run.

Lets take a standard word count example with distributed cache . I have a files article.txt placed in HDFS files system. While running a word count example , I will read article from this file and ignore in word count.

Sample Cache File


Cache files contains three article as 'a','an','the'.

Input File for Word Count:

Here is a Program:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DistributedCacheExample extends Configured implements Tool {

static class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
static enum ArticleCounter{A_Counter,AN_Counter,THE_Counter};
HashSet articleList = new HashSet();
Text word = new Text();
int number = 0;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
Path getPath = null;
try
{
URI localFile [] = DistributedCache.getCacheFiles(context.getConfiguration());
 getPath = new Path(localFile[0].getPath());
}
catch(Exception e)
{
System.out.println("Error in Set up method" + e.getStackTrace().toString());
}

FileSystem fs = FileSystem.get(context.getConfiguration());
try {
BufferedReader fis = new BufferedReader(new  InputStreamReader(fs.open(getPath)));
    String token = null;
    while ((token = fis.readLine()) != null) {
      articleList.add(token);
      System.out.println("Token Added" + token);
    }
    fis.close();
  } catch (IOException ioe) {
    System.err.println("Error while Reading a file "+ ioe.toString());
  }
}
@Override
public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {
String tokenValue= "";
String line = value.toString();
StringTokenizer  tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){

tokenValue = tokenizer.nextToken().toString();
if (articleList.contains(tokenValue.toLowerCase()))
if (tokenValue.toLowerCase().equals("a"))
{
//Do nothing
}
else if (tokenValue.toLowerCase().equals("an"))
{
//Do nothing

}
else if (tokenValue.toLowerCase().equals("the"))
{

//Do nothing
}
}
else
{
context.write(new Text(tokenValue), new IntWritable(1));

}

}
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
FileInputFormat.addInputPath(job, new  Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("Adding Cache File");
DistributedCache.addCacheFile(new Path(args[2]).toUri(), job.getConfiguration());
job.setJarByClass(DistributedCacheExample.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(DistributedCacheMapper.class);
job.setReducerClass(DistributedChacheReducer.class);

return job.waitForCompletion(true) ? 0 : 1;
}
static class DistributedChacheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));

}
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistributedCacheExample(), args);
System.exit(exitCode);
}

}


Here output does not contains any article.
Ouput:



Comments

Popular posts from this blog

JDBC Hive Connection fails : Unable to read HiveServer2 uri from ZooKeeper

Access Kubernetes ConfigMap in Spring Boot Application

Developing Custom Processor in Apache Nifi