Example 1:
Problem statement:
ABC.com is an online music website where users listen to various tracks, the data gets collected like shown below.
The data in log files and looks like as shown below.
Write a map reduce program to get following
Firstly, we should first understand the data in the logs, Here the first column is user_id , second is track_id and so on.
Let's implement the mapreduce jobs based on above requirements.
Problem statement 1:
Number of unique listeners per track:
Our main focus is to write a mapper class that should emit the track_id and user_id's as intermediate key-value pairs to reducer phase.
To remember the data sequence in the log file, let's create CONSTANTS class to hold those information.
CONSTANTS Class to organize data of log:
package mapreduce;
public class Music_website_constants
{
//This constants class helps to remember the data sequence in the music website log file
//Here the initialized values are the location of the data to pass in array.
public static final int USER_ID=0;
public static final int TRACK_ID=1;
public static final int IS_SHARED=2;
public static final int RADIO=3;
public static final int IS_SKIPPED=4;
}
Problem Statement 2:
Number of times the track was shared with others
Our main focus is to write a mapper class that should emit the track_id and share_counts as intermediate key-value pairs to reducer phase.
To remember the data sequence in the log file, let's create CONSTANTS class to hold those information.
CONSTANTS Class to organize data of log:
package mapreduce;
public class Music_website_constants
{
//This constants class helps to remember the data sequence in the music website log file
//Here the initialized values are the location of the data to pass in array.
public static final int USER_ID=0;
public static final int TRACK_ID=1;
public static final int IS_SHARED=2;
public static final int RADIO=3;
public static final int IS_SKIPPED=4;
}
MapReduce Main class :
package mapreduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Music_website_sharecount {
// enum is used to define collections of constants
private enum COUNTERS
{
INVALID_RECORD_COUNTS
}
// Mapper class to emit intermediate key-value pairs(TRACK_ID,SHARE_COUNT)
/**
* Mapper Phase: The text from the input text file is tokenized into words
* to form a key value pair
*/
private static class MusicMapper_share extends
Mapper<Object, Text, IntWritable, IntWritable> {
// Output object variables
IntWritable track_id = new IntWritable();
IntWritable share_id = new IntWritable();
// Override the map function to add Mapper logic
@Override
public void map(Object key, Text value,
Mapper<Object, Text, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// Read the input file as line by line
String line = value.toString();
// Split the line into tokens, based on delimiter by pipeline
String tokens[] = line.split("[|]");
track_id.set(Integer
.parseInt(tokens[Music_website_constants.TRACK_ID]));
share_id.set(Integer
.parseInt(tokens[Music_website_constants.IS_SHARED]));
// Condition to enter if it's shared to make as value for key
if (share_id.get() == 1) {
if (tokens.length == 5) {
context.write(track_id, share_id);
} else {
// add counter for invalid records
context.getCounter(COUNTERS.INVALID_RECORD_COUNTS)
.increment(1L);
}
}
}
}
// Reducer class: it's an aggregation phase for the keys generated by the
// map phase
/**
* Reducer Phase: In the reduce phase, all the keys are grouped together and
* the values for similar keys are added up
*/
// Problem statement : Number of unique listeners per track
private static class MusicReducer_share extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
// Override the reduce function to add Reducer logic
@Override
public void reduce(
IntWritable trackid,
Iterable<IntWritable> share_ids,
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// To avoid holding duplicate user id's , we are using set
// collection
Set<Integer> share_idset = new HashSet<>();
for (IntWritable shareID : share_ids) {
share_idset.add(shareID.get());
}
context.write(trackid, new IntWritable(share_idset.size()));
}
}
// Main method, where the flow of execution and configuration of mapper and
// reducer
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Music_website_sharecount");
// Set the Main class name,mapper,reducer to the job
job.setJarByClass(Music_website_sharecount.class);
job.setMapperClass(MusicMapper_share.class);
job.setReducerClass(MusicReducer_share.class);
// Set outkeyclass and outputvalue class to the job
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// Define the fileInput path and output path format
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Problem statement:
ABC.com is an online music website where users listen to various tracks, the data gets collected like shown below.
The data in log files and looks like as shown below.
UserId|TrackId|Shared|Radio|Skip
12345|521|0|1|0
34567|521|1|0|0
12345|222|0|1|1
23456|225|1|0|0
Write a map reduce program to get following
- Number of unique listeners per track
- Number of times the track was shared with others
- Number of times the track was listened to on the radio
- Number of times the track was listened to in total
- Number of times the track was skipped on the radio
Firstly, we should first understand the data in the logs, Here the first column is user_id , second is track_id and so on.
Let's implement the mapreduce jobs based on above requirements.
Problem statement 1:
Number of unique listeners per track:
Our main focus is to write a mapper class that should emit the track_id and user_id's as intermediate key-value pairs to reducer phase.
To remember the data sequence in the log file, let's create CONSTANTS class to hold those information.
CONSTANTS Class to organize data of log:
package mapreduce;
public class Music_website_constants
{
//This constants class helps to remember the data sequence in the music website log file
//Here the initialized values are the location of the data to pass in array.
public static final int USER_ID=0;
public static final int TRACK_ID=1;
public static final int IS_SHARED=2;
public static final int RADIO=3;
public static final int IS_SKIPPED=4;
}
MapReduce Main class :
package mapreduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Music_website
{
// enum is used to define collections of constants
private enum COUNTERS
{
INVALID_RECORD_COUNTS
}
// Mapper class to emit intermediate key-value pairs(TRACK_ID,USER_ID)
/**
* Mapper Phase: The text from the input text file is tokenized into words
* to form a key value pair
*/
private static class MusicMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
// Output object variables
IntWritable user_id = new IntWritable();
IntWritable track_id = new IntWritable();
// Override the map function to add Mapper logic
@Override
public void map(Object key, Text value,
Mapper<Object, Text, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// Read the input file as line by line
String line = value.toString();
// Split the line into tokens, based on delimiter by pipeline
String tokens[] = line.split("[|]");
user_id.set(Integer
.parseInt(tokens[Music_website_constants.USER_ID]));
track_id.set(Integer
.parseInt(tokens[Music_website_constants.TRACK_ID]));
if (tokens.length == 5) {
context.write(track_id, user_id);
} else {
// add counter for invalid records
context.getCounter(COUNTERS.INVALID_RECORD_COUNTS)
.increment(1L);
}
}
}
// Reducer class: it's an aggregation phase for the keys generated by the
// map phase
/**
* Reducer Phase: In the reduce phase, all the keys are grouped together and
* the values for similar keys are added up
*/
// Problem statement : Number of unique listeners per track
private static class MusicReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
// Override the reduce function to add Reducer logic
@Override
public void reduce(
IntWritable trackid,
Iterable<IntWritable> userids,
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// To avoid holding duplicate user id's , we are using set
// collection
Set<Integer> useridset = new HashSet<>();
for (IntWritable userID : userids) {
useridset.add(userID.get());
}
context.write(trackid, new IntWritable(useridset.size()));
}
}
// Main method, where the flow of execution and configuration of mapper and
// reducer
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Music_website");
// Set the Main class name,mapper,reducer to the job
job.setJarByClass(Music_website.class);
job.setMapperClass(MusicMapper.class);
job.setReducerClass(MusicReducer.class);
// Set outkeyclass and outputvalue class to the job
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// Define the fileInput path and output path format
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Steps to execute the MapReduce Job:
Step 1: Extract the code as jar from IDE, and place it on the server,
Step 2: Use the Hadoop file system script to run the jar for mapreduce job
Syntax:
$HADDOP_HOME/hadoop jar <Jar_location> package.class_name <Input_file_location> <Map_reduce_output>
Example:
hadoop-2.7.3/bin/hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.Music_website /Real_time_MR/input/Radio_log.txt /Real_time_MR/output1
Output:
hadoop@Manohar:~$ hadoop-2.7.3/bin/hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.Music_website /Real_time_MR/input/Radio_log.txt /Real_time_MR/output1
17/08/05 14:53:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/05 14:53:32 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/08/05 14:53:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/08/05 14:53:33 INFO input.FileInputFormat: Total input paths to process : 1
17/08/05 14:53:33 INFO mapreduce.JobSubmitter: number of splits:1
17/08/05 14:53:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501077058026_0045
17/08/05 14:53:33 INFO impl.YarnClientImpl: Submitted application application_1501077058026_0045
17/08/05 14:53:33 INFO mapreduce.Job: The url to track the job: http://Manohar:8088/proxy/application_1501077058026_0045/
17/08/05 14:53:33 INFO mapreduce.Job: Running job: job_1501077058026_0045
17/08/05 14:53:41 INFO mapreduce.Job: Job job_1501077058026_0045 running in uber mode : false
17/08/05 14:53:41 INFO mapreduce.Job: map 0% reduce 0%
17/08/05 14:53:46 INFO mapreduce.Job: map 100% reduce 0%
17/08/05 14:53:52 INFO mapreduce.Job: map 100% reduce 100%
17/08/05 14:53:53 INFO mapreduce.Job: Job job_1501077058026_0045 completed successfully
17/08/05 14:53:53 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=46
FILE: Number of bytes written=236657
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=183
HDFS: Number of bytes written=18
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3266
Total time spent by all reduces in occupied slots (ms)=3380
Total time spent by all map tasks (ms)=3266
Total time spent by all reduce tasks (ms)=3380
Total vcore-milliseconds taken by all map tasks=3266
Total vcore-milliseconds taken by all reduce tasks=3380
Total megabyte-milliseconds taken by all map tasks=3344384
Total megabyte-milliseconds taken by all reduce tasks=3461120
Map-Reduce Framework
Map input records=4
Map output records=4
Map output bytes=32
Map output materialized bytes=46
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=46
Reduce input records=4
Reduce output records=3
Spilled Records=8
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=145
CPU time spent (ms)=1280
Physical memory (bytes) snapshot=437940224
Virtual memory (bytes) snapshot=3799502848
Total committed heap usage (bytes)=355991552
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=64
File Output Format Counters
Bytes Written=18
Step 3: View the results(using cat command) of MapReduce job
Example:
hadoop-2.7.3/bin/hadoop fs -cat /Real_time_MR/output1/part-r-00000
Problem Statement 2:
Number of times the track was shared with others
Our main focus is to write a mapper class that should emit the track_id and share_counts as intermediate key-value pairs to reducer phase.
To remember the data sequence in the log file, let's create CONSTANTS class to hold those information.
CONSTANTS Class to organize data of log:
package mapreduce;
public class Music_website_constants
{
//This constants class helps to remember the data sequence in the music website log file
//Here the initialized values are the location of the data to pass in array.
public static final int USER_ID=0;
public static final int TRACK_ID=1;
public static final int IS_SHARED=2;
public static final int RADIO=3;
public static final int IS_SKIPPED=4;
}
MapReduce Main class :
package mapreduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Music_website_sharecount {
// enum is used to define collections of constants
private enum COUNTERS
{
INVALID_RECORD_COUNTS
}
// Mapper class to emit intermediate key-value pairs(TRACK_ID,SHARE_COUNT)
/**
* Mapper Phase: The text from the input text file is tokenized into words
* to form a key value pair
*/
private static class MusicMapper_share extends
Mapper<Object, Text, IntWritable, IntWritable> {
// Output object variables
IntWritable track_id = new IntWritable();
IntWritable share_id = new IntWritable();
// Override the map function to add Mapper logic
@Override
public void map(Object key, Text value,
Mapper<Object, Text, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// Read the input file as line by line
String line = value.toString();
// Split the line into tokens, based on delimiter by pipeline
String tokens[] = line.split("[|]");
track_id.set(Integer
.parseInt(tokens[Music_website_constants.TRACK_ID]));
share_id.set(Integer
.parseInt(tokens[Music_website_constants.IS_SHARED]));
// Condition to enter if it's shared to make as value for key
if (share_id.get() == 1) {
if (tokens.length == 5) {
context.write(track_id, share_id);
} else {
// add counter for invalid records
context.getCounter(COUNTERS.INVALID_RECORD_COUNTS)
.increment(1L);
}
}
}
}
// Reducer class: it's an aggregation phase for the keys generated by the
// map phase
/**
* Reducer Phase: In the reduce phase, all the keys are grouped together and
* the values for similar keys are added up
*/
// Problem statement : Number of unique listeners per track
private static class MusicReducer_share extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
// Override the reduce function to add Reducer logic
@Override
public void reduce(
IntWritable trackid,
Iterable<IntWritable> share_ids,
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
// To avoid holding duplicate user id's , we are using set
// collection
Set<Integer> share_idset = new HashSet<>();
for (IntWritable shareID : share_ids) {
share_idset.add(shareID.get());
}
context.write(trackid, new IntWritable(share_idset.size()));
}
}
// Main method, where the flow of execution and configuration of mapper and
// reducer
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Music_website_sharecount");
// Set the Main class name,mapper,reducer to the job
job.setJarByClass(Music_website_sharecount.class);
job.setMapperClass(MusicMapper_share.class);
job.setReducerClass(MusicReducer_share.class);
// Set outkeyclass and outputvalue class to the job
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// Define the fileInput path and output path format
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Steps to execute the MapReduce Job
Step 1: Extract the code as jar from IDE, and place it on the server,
Step 2: Use the Hadoop file system script to run the jar for mapreduce job
Syntax:
$HADDOP_HOME/hadoop jar <Jar_location> package.class_name <Input_file_location> <Map_reduce_output>
Example:
hadoop-2.7.3/bin/hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.Music_website_sharecount /Real_time_MR/input/Radio_log.txt /Real_time_MR/output2
Output:
hadoop@Manohar:~$ hadoop-2.7.3/bin/hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.Music_website_sharecount /Real_time_MR/input/Radio_log.txt /Real_time_MR/output2
17/08/05 15:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/05 15:24:43 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/08/05 15:24:43 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/08/05 15:24:43 INFO input.FileInputFormat: Total input paths to process : 1
17/08/05 15:24:44 INFO mapreduce.JobSubmitter: number of splits:1
17/08/05 15:24:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501077058026_0046
17/08/05 15:24:45 INFO impl.YarnClientImpl: Submitted application application_1501077058026_0046
17/08/05 15:24:45 INFO mapreduce.Job: The url to track the job: http://Manohar:8088/proxy/application_1501077058026_0046/
17/08/05 15:24:45 INFO mapreduce.Job: Running job: job_1501077058026_0046
17/08/05 15:24:53 INFO mapreduce.Job: Job job_1501077058026_0046 running in uber mode : false
17/08/05 15:24:53 INFO mapreduce.Job: map 0% reduce 0%
17/08/05 15:24:58 INFO mapreduce.Job: map 100% reduce 0%
17/08/05 15:25:04 INFO mapreduce.Job: map 100% reduce 100%
17/08/05 15:25:04 INFO mapreduce.Job: Job job_1501077058026_0046 completed successfully
17/08/05 15:25:04 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=26
FILE: Number of bytes written=236707
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=183
HDFS: Number of bytes written=12
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3269
Total time spent by all reduces in occupied slots (ms)=3410
Total time spent by all map tasks (ms)=3269
Total time spent by all reduce tasks (ms)=3410
Total vcore-milliseconds taken by all map tasks=3269
Total vcore-milliseconds taken by all reduce tasks=3410
Total megabyte-milliseconds taken by all map tasks=3347456
Total megabyte-milliseconds taken by all reduce tasks=3491840
Map-Reduce Framework
Map input records=4
Map output records=2
Map output bytes=16
Map output materialized bytes=26
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=26
Reduce input records=2
Reduce output records=2
Spilled Records=4
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=163
CPU time spent (ms)=1380
Physical memory (bytes) snapshot=438980608
Virtual memory (bytes) snapshot=3799109632
Total committed heap usage (bytes)=357040128
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=64
File Output Format Counters
Bytes Written=12
Step 3: View the results(using cat command) of MapReduce job
hadoop@Manohar:~$ hadoop-2.7.3/bin/hadoop fs -cat /Real_time_MR/output2/part-r-00000
17/08/05 15:25:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
225 1
521 1
No comments:
Post a Comment