How to write MapReduce program:
Step 1: Create a main class which holds both the Mapper and Reducer inner classes( Note: always not to be inner, can be call from outside as well)
Step 2: Inside Main class , create a class which extends the mapper class is called Mapper class(where the mapper logic resides in it).
Mapper class takes the generic types for KEYIN,VALUEIN,KEYOUT,VALUEOUT
Ex:
public class WordCount_
{
private static class WordMapper extends Mapper<Object, Text, Text, IntWritable>
{
}
}
Step 3: then override map method, to add the mapper logic
Ex:
public class WordCount_
{
private static class WordMapper extends Mapper<Object, Text, Text, IntWritable>
{
//Output object variable
Text word=new Text();
IntWritable one=new IntWritable(1);
public void map(Object key,Text file,Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
String line=file.toString();
StringTokenizer tokens=new StringTokenizer(line);
while(tokens.hasMoreTokens())
{
word.set(tokens.nextToken());
context.write(word, one);
}
}
}
}
Step 4: Again another class inside main class called Reducer where in the aggregation logic resides
Reducer class also takes the generic types for KEYIN,VALUEIN,KEYOUT,VALUEOUT
Ex:
public class WordCount_
{
private static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
}
}
Note: for Reducer,Input is the output of the mapper
Step 5: then overribe reduce method, to add the reduce logic
Ex:
private static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
//Reduce phase - all the keys are grouped together and the values for similar keys are added up
public void reduce(Text key,Iterable<IntWritable> words,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable word:words)
{
sum+=word.get();
}
context.write(key, new IntWritable(sum));
}
}
Step 4: Configure the Mapper,Reducer
Example:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCount_.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
Excuting the program:
Step 1: Make sure the words file in HDFS directory, else copy that from local to HDFS
Command:
hadoop@Mano:~$ hadoop fs -put /home/hadoop/Mano/wordcount.dat /Mano/WordCount/
Step 2:Execute the jar using jar command
Command:
hadoop@Mano:~$ hadoop jar /home/hadoop/Mano/MapReduce1.jar mapreduce.WordCount_ /Mano/WordCount/wordcount.dat /Mano/WordCount/Output
Step 3: Verify the results
Command:
hadoop@Mano:~$ hadoop fs -cat /Mano/WordCount/Output/pa*
Step 1: Create a main class which holds both the Mapper and Reducer inner classes( Note: always not to be inner, can be call from outside as well)
Step 2: Inside Main class , create a class which extends the mapper class is called Mapper class(where the mapper logic resides in it).
Mapper class takes the generic types for KEYIN,VALUEIN,KEYOUT,VALUEOUT
Ex:
public class WordCount_
{
private static class WordMapper extends Mapper<Object, Text, Text, IntWritable>
{
}
}
Step 3: then override map method, to add the mapper logic
Ex:
public class WordCount_
{
private static class WordMapper extends Mapper<Object, Text, Text, IntWritable>
{
//Output object variable
Text word=new Text();
IntWritable one=new IntWritable(1);
public void map(Object key,Text file,Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
String line=file.toString();
StringTokenizer tokens=new StringTokenizer(line);
while(tokens.hasMoreTokens())
{
word.set(tokens.nextToken());
context.write(word, one);
}
}
}
}
Step 4: Again another class inside main class called Reducer where in the aggregation logic resides
Reducer class also takes the generic types for KEYIN,VALUEIN,KEYOUT,VALUEOUT
Ex:
public class WordCount_
{
private static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
}
}
Note: for Reducer,Input is the output of the mapper
Step 5: then overribe reduce method, to add the reduce logic
Ex:
private static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
//Reduce phase - all the keys are grouped together and the values for similar keys are added up
public void reduce(Text key,Iterable<IntWritable> words,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable word:words)
{
sum+=word.get();
}
context.write(key, new IntWritable(sum));
}
}
Step 4: Configure the Mapper,Reducer
- Create object for Configuration class
Ex:
Configuration conf=new Configuration();
- Create object for Job class
Ex:
Job job=Job.getInstance(conf, "Word Count");
- Set the main,mapper and reducer classes for the job.
Ex:
job.setJarByClass(WordCount_.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
- Set the keyoutput and valueoutput classes to the Job,
Ex:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- Set the Path of Input and output
Ex:
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Example:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCount_.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
Excuting the program:
Step 1: Make sure the words file in HDFS directory, else copy that from local to HDFS
Command:
hadoop@Mano:~$ hadoop fs -put /home/hadoop/Mano/wordcount.dat /Mano/WordCount/
Step 2:Execute the jar using jar command
Command:
hadoop@Mano:~$ hadoop jar /home/hadoop/Mano/MapReduce1.jar mapreduce.WordCount_ /Mano/WordCount/wordcount.dat /Mano/WordCount/Output
Step 3: Verify the results
Command:
hadoop@Mano:~$ hadoop fs -cat /Mano/WordCount/Output/pa*
No comments:
Post a Comment