Wednesday 2 August 2017

MapReduce Real time Example 2:

Example 2:

Problem statement:
Call Data Record (CDR) Analytics:

write a map reduce code to find out all phone numbers who are making more than 60 mins of STD calls. Here if STD Flag is 1 that means it was as STD Call.
By identifying such subscribers, telcome company wants to offer them STD(Long Distance) Pack which would efficient for them instead spending more money without that package.

Telecom company keeps records for its subscribers in specific format.

Consider the following format:
FromPhoneNumber|ToPhoneNumber|CallStartTime|CallEndTime|STDFlag

The data in log files and looks like as shown below.

FromPhoneNumber|ToPhoneNumber|CallStartTime|CallEndTime|STDFlag
9665128505|8983006310|2015-03-01 09:08:10|2015-03-01 10:12:15|1
9665128505|8983006310|2015-03-01 07:08:10|2015-03-01 08:12:15|0
9665128505|8983006310|2015-03-01 09:08:10|2015-03-01 09:12:15|1
9665128505|8983006310|2015-03-01 09:08:10|2015-03-01 10:12:15|0
9665128506|8983006310|2015-03-01 09:08:10|2015-03-01 10:12:15|1
9665128507|8983006310|2015-03-01 09:08:10|2015-03-01 10:12:15|1

Firstly, we should understand the data in the logs, Here the first column is FromPhoneNumber , second is ToPhoneNumber and so on.

Our main focus is to write a mapper class that should emit the FromPhoneNumber and Call_time as intermediate key-value pairs to reducer phase.

To remember the data sequence in the log file, let's create CONSTANTS class to hold those information.

CONSTANTS Class to organize data of log:

package mapreduce;

public class CDR_constants 
{
public static final int FROM_PHONE_NO=0;
public static final int TO_PHONE_NO=1;
public static int callStartTime = 2;
    public static int callEndTime = 3;
    public static int STDFlag = 4;
}

MapReduce Main class :

package mapreduce;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CDR_Analytics 
{
// Mapper class to emit intermediate key-value pairs(FROM_PHONE_NO,CALL_DURATION)
 /**
  * Mapper Phase: The text from the input text file is tokenized into words
  * to form a key value pair
  */
private static class CDR_Mapper extends Mapper<Object, Text, Text, LongWritable>
{
//output objects
Text phone_no=new Text();
LongWritable call_time=new LongWritable();
// Override the map function to add Mapper logic
  @Override
public void map(Object key,Text value,Mapper<Object, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException
{
// Read the input file as line by line
String line=value.toString();
// Split the line into tokens, based on delimiter by pipeline
String[] words=line.split("[|]");
if(words[CDR_constants.STDFlag].equalsIgnoreCase("1"))
{
phone_no.set(words[CDR_constants.FROM_PHONE_NO]);
String startTime=words[CDR_constants.callStartTime];
String endTime=words[CDR_constants.callEndTime];
long call_duration=toMilliseconds(endTime)-toMilliseconds(startTime);
long Call_minutes=(call_duration/(1000*60));
call_time.set(Call_minutes);
context.write(phone_no,call_time);
}
}
private long toMilliseconds(String time)
{
String pattern="yyyy-MM-dd HH:mm:ss";
SimpleDateFormat format=new SimpleDateFormat(pattern);
Date date=null;
try
{
date=format.parse(time);
}
catch(ParseException e)
{
e.printStackTrace();
}
return date.getTime();
}
}
// Reducer class: it's an aggregation phase for the keys generated by the
 // map phase
 /**
  * Reducer Phase: In the reduce phase, all the keys are grouped together and
  * the values for similar keys are added up
  */
 // Problem statement :code to find out all phone numbers who are making more than 60 mins of STD calls
private static class CDR_Reducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
private LongWritable result=new LongWritable();
public void reduce(Text phone_num,Iterable<LongWritable> durations,Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException
{
long sum=0;
for(LongWritable duration:durations)
{
sum+=duration.get();
}
if(sum>=60)
{
result.set(sum);
context.write(phone_num,result);
}
}
}
// Main method, where the flow of execution and configuration of mapper and
// reducer
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "CDR_Analytics");
// Set the Main class name,mapper,reducer to the job
job.setJarByClass(CDR_Analytics.class);
job.setMapperClass(CDR_Mapper.class);
job.setReducerClass(CDR_Reducer.class);
// Set outkeyclass and outputvalue class to the job
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// Define the fileInput path and output path format
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}

Steps to execute the MapReduce Job:

Step 1: Extract the code as jar from IDE, and place it on the server,


Step 2: Use the Hadoop file system script to run the jar for mapreduce job

Syntax:
$HADDOP_HOME/hadoop jar <Jar_location> package.class_name <Input_file_location> <Map_reduce_output>

Example:
hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.CDR_Analytics /Real_time_MR/CRD_Analytics/CDR_Analytics_Log.txt /Real_time_MR/CRD_Analytics/output3

Output:

hadoop@Manohar:~$ hadoop jar /home/hadoop/MapReduce_jars/MapReduce_map.jar mapreduce.CDR_Analytics /Real_time_MR/CRD_Analytics/CDR_Analytics_Log.txt /Real_time_MR/CRD_Analytics/output3
17/08/06 12:52:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/06 12:52:50 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/08/06 12:52:50 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/08/06 12:52:50 INFO input.FileInputFormat: Total input paths to process : 1
17/08/06 12:52:50 INFO mapreduce.JobSubmitter: number of splits:1
17/08/06 12:52:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501077058026_0050
17/08/06 12:52:51 INFO impl.YarnClientImpl: Submitted application application_1501077058026_0050
17/08/06 12:52:51 INFO mapreduce.Job: The url to track the job: http://Manohar:8088/proxy/application_1501077058026_0050/
17/08/06 12:52:51 INFO mapreduce.Job: Running job: job_1501077058026_0050
17/08/06 12:52:58 INFO mapreduce.Job: Job job_1501077058026_0050 running in uber mode : false
17/08/06 12:52:58 INFO mapreduce.Job:  map 0% reduce 0%
17/08/06 12:53:04 INFO mapreduce.Job:  map 100% reduce 0%
17/08/06 12:53:10 INFO mapreduce.Job:  map 100% reduce 100%
17/08/06 12:53:10 INFO mapreduce.Job: Job job_1501077058026_0050 completed successfully
17/08/06 12:53:10 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=90
                FILE: Number of bytes written=236789
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=525
                HDFS: Number of bytes written=42
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=3178
                Total time spent by all reduces in occupied slots (ms)=3294
                Total time spent by all map tasks (ms)=3178
                Total time spent by all reduce tasks (ms)=3294
                Total vcore-milliseconds taken by all map tasks=3178
                Total vcore-milliseconds taken by all reduce tasks=3294
                Total megabyte-milliseconds taken by all map tasks=3254272
                Total megabyte-milliseconds taken by all reduce tasks=3373056
        Map-Reduce Framework
                Map input records=6
                Map output records=4
                Map output bytes=76
                Map output materialized bytes=90
                Input split bytes=135
                Combine input records=0
                Combine output records=0
                Reduce input groups=3
                Reduce shuffle bytes=90
                Reduce input records=4
                Reduce output records=3
                Spilled Records=8
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=167
                CPU time spent (ms)=1330
                Physical memory (bytes) snapshot=437833728
                Virtual memory (bytes) snapshot=3799126016
                Total committed heap usage (bytes)=355991552
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=390
        File Output Format Counters
                Bytes Written=42

Step 3: View the results(using cat command) of MapReduce job

Example:

hadoop@Manohar:~$ hadoop fs -cat /Real_time_MR/CRD_Analytics/output3/part-r-00000
17/08/06 12:54:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
9665128505      68
9665128506      64
9665128507      64

1 comment:

  1. It was really a nice article and i was really impressed by reading this Big Data Hadoop Online Training

    ReplyDelete

Fundamentals of Python programming

Fundamentals of Python programming: Following below are the fundamental constructs of Python programming: Python Data types Python...