Distributed Cache in Hadoop
Side Data Distribution : Side data can be defined as extra read-only data needed by a job to process the main dataset. The challenge is to make side data available to all the map or reduce tasks
(which are spread across the cluster) in a convenient and efficient fashion.
Distributed Cache : It provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run.
Lets take a standard word count example with distributed cache . I have a files article.txt placed in HDFS files system. While running a word count example , I will read article from this file and ignore in word count.
Sample Cache File
Cache files contains three article as 'a','an','the'.
Input File for Word Count:
Here is a Program:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DistributedCacheExample extends Configured implements Tool {
static class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
static enum ArticleCounter{A_Counter,AN_Counter,THE_Counter};
HashSet articleList = new HashSet();
Text word = new Text();
int number = 0;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
Path getPath = null;
try
{
URI localFile [] = DistributedCache.getCacheFiles(context.getConfiguration());
getPath = new Path(localFile[0].getPath());
}
catch(Exception e)
{
System.out.println("Error in Set up method" + e.getStackTrace().toString());
}
FileSystem fs = FileSystem.get(context.getConfiguration());
try {
BufferedReader fis = new BufferedReader(new InputStreamReader(fs.open(getPath)));
String token = null;
while ((token = fis.readLine()) != null) {
articleList.add(token);
System.out.println("Token Added" + token);
}
fis.close();
} catch (IOException ioe) {
System.err.println("Error while Reading a file "+ ioe.toString());
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tokenValue= "";
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
tokenValue = tokenizer.nextToken().toString();
if (articleList.contains(tokenValue.toLowerCase()))
if (tokenValue.toLowerCase().equals("a"))
{
//Do nothing
}
else if (tokenValue.toLowerCase().equals("an"))
{
//Do nothing
}
else if (tokenValue.toLowerCase().equals("the"))
{
//Do nothing
}
}
else
{
context.write(new Text(tokenValue), new IntWritable(1));
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("Adding Cache File");
DistributedCache.addCacheFile(new Path(args[2]).toUri(), job.getConfiguration());
job.setJarByClass(DistributedCacheExample.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(DistributedCacheMapper.class);
job.setReducerClass(DistributedChacheReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
static class DistributedChacheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistributedCacheExample(), args);
System.exit(exitCode);
}
}
Here output does not contains any article.
Ouput:
(which are spread across the cluster) in a convenient and efficient fashion.
Distributed Cache : It provides a service for copying files and archives to the task nodes in time for the tasks to use them when they run.
Lets take a standard word count example with distributed cache . I have a files article.txt placed in HDFS files system. While running a word count example , I will read article from this file and ignore in word count.
Sample Cache File
Cache files contains three article as 'a','an','the'.
Input File for Word Count:
Here is a Program:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DistributedCacheExample extends Configured implements Tool {
static class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
static enum ArticleCounter{A_Counter,AN_Counter,THE_Counter};
HashSet articleList = new HashSet();
Text word = new Text();
int number = 0;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
Path getPath = null;
try
{
URI localFile [] = DistributedCache.getCacheFiles(context.getConfiguration());
getPath = new Path(localFile[0].getPath());
}
catch(Exception e)
{
System.out.println("Error in Set up method" + e.getStackTrace().toString());
}
FileSystem fs = FileSystem.get(context.getConfiguration());
try {
BufferedReader fis = new BufferedReader(new InputStreamReader(fs.open(getPath)));
String token = null;
while ((token = fis.readLine()) != null) {
articleList.add(token);
System.out.println("Token Added" + token);
}
fis.close();
} catch (IOException ioe) {
System.err.println("Error while Reading a file "+ ioe.toString());
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tokenValue= "";
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
tokenValue = tokenizer.nextToken().toString();
if (articleList.contains(tokenValue.toLowerCase()))
if (tokenValue.toLowerCase().equals("a"))
{
//Do nothing
}
else if (tokenValue.toLowerCase().equals("an"))
{
//Do nothing
}
else if (tokenValue.toLowerCase().equals("the"))
{
//Do nothing
}
}
else
{
context.write(new Text(tokenValue), new IntWritable(1));
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("Adding Cache File");
DistributedCache.addCacheFile(new Path(args[2]).toUri(), job.getConfiguration());
job.setJarByClass(DistributedCacheExample.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(DistributedCacheMapper.class);
job.setReducerClass(DistributedChacheReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
static class DistributedChacheReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistributedCacheExample(), args);
System.exit(exitCode);
}
}
Here output does not contains any article.
Ouput:
Comments
Post a Comment