A Reduce side join example ( A MapReduce example )
Using three file Sales.csv, Customer.csv, Product.csvA data files of above three *.csv files
https://drive.google.com/file/d/0B3DvJosZ_aAlN1plekRRdHlnMXM/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlRm00MGdSNzdYcFk/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlSFYxeVVCY0p6NlE/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlaDE4WTJHd0JXT3c/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlbXFUWHVZNmZab28/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlal9SWUQtNEtTMWc/view?usp=sharing
________________________________________
Problem Statement is : Sales.csv has customer-id and product-id
Take the customer-id from Customer.csv and product-id from Product.csv and add it to Sales
From Sales.csv display the final result as Product name : customer name +quantity+date
My approach
Step1:I took Customer.csv file in cache
Step2:I read Sales.csv into mapper , and for every cust-id in sales i replaced with customer name(read from Customer.csv in cache) with output as (product id as key) and remaining fields of (customername+quantity+date as values)
Step3: In another mapper i read product id as key and product name as value(here if you notice product-id is key in both mappers) from file Product.csv
Step4:In both mappers product-id is key
Step5: In Main class i used MultipleInputs and as the output is sorted on keys on Product-id combining product name and who bought them
step6: left the key out as NullWritable and only wrote the values out
____________________________________
package com.nandgama.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
//import org.apache.hadoop.util.StringUtils;
//import com.ibm.icu.util.StringTokenizer;
public class SalesRedJoin {
private Set<String> custInfo= new HashSet<String>();
private List<String> custInfo1 = new ArrayList<String>();
private Configuration conf ;
private BufferedReader fis;
public class SalesRedMapper extends
Mapper<Object, Text, Text, Text> {
protected void setup(Mapper<Object,Text,Text,Text>.Context context)
throws IOException, InterruptedException {
//open setup function5
//to read customer cache file from hdfs Customer.csv
if (context.getCacheFiles() !=null && context.getCacheFiles().length>0)
{
//if block checking of uri1
//
conf =context.getConfiguration();
URI[] fileURIs= Job.getInstance(conf).getCacheFiles();
File custFile = null;
for(URI pathURI :fileURIs){
//Path fPath = new Path(pathURI.getPath());
custFile =new File(pathURI);
}//for block closed
try{
//try block open2
//Reading the cache file and storing in private identifier custInfo1,custInfo
fis =new BufferedReader(new FileReader(custFile));
String line =null;
while((line = fis.readLine())!= null) {
//while open
custInfo1.add(line);
custInfo.add(line);
//while closed
}
fis.close();
//try block closed2
}catch(IOException ioe){
//catch block open3
System.err.println("Caught exception while parsing the cached file '"
);
}//catch block closed3
//if block closed-1
}
super.setup(context);
}//Setup function closed5
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//map function begins
// map function to read Sales.csv
String name1key = null;
//String valueWr = null;
StringTokenizer str1= new StringTokenizer(value.toString(),",");
for(String s : custInfo){
//Reading the data in custInfo record by record
String[] custDetail = s.split(",");
String tcusId = custDetail[4];
String tname =custDetail[0]+" "+custDetail[1];
while(str1.hasMoreTokens())
{//start of while loop
//checking if the Sales.csv file for above custId and get the name
if (tcusId.equals(str1.nextToken())){
name1key=tname.toString();
}
}// close of while loop
}//close of for loop
//forming the output key and value pairs with substitute name
String val1[] = value.toString().split(",");
key = new Text(val1[1].toString());
String val2 = "("+name1key.toString()+"," + val1[2].toString()+","+val1[3].toString()+")";
//setting product id as key and remaining value with name and other important parameters
context.write((Text) key,new Text(val2.toString()));
}//end of map function
}//end of SalesRedMapper class
public class ProdRedMapper extends
Mapper<Object, Text, Text, Text> {
//This map function to read Product.csv
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String val2[] = value.toString().split(",");
key = new Text(val2[5].toString());
String val3=val2[0].toString();
//writing our product number as key and product name as value
context.write((Text)key, new Text(val3.toString()));
}
}
public static class ProdRed extends Reducer<Text,Text,NullWritable,Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws
IOException ,InterruptedException{
//Writing out only values leaving the product number
context.write(NullWritable.get(), (Text) values);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] programArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (programArgs.length != 3) {
System.err.println("Usage: SalesRedJoin <in> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Reduce side Join");
job.setJarByClass(SalesRedJoin.class);
job.setReducerClass(ProdRed.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//setting multiple input paths to Sales.csv and Product.csv with map classes
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,SalesRedMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,ProdRedMapper.class);
Path outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
//Adding cache file to each node present in hdfs
job.addCacheFile(new URI("/user/biadmin/TempData/Customer.csv#Customers"));
// Submit the job and wait for it to finish.
System.exit(job.waitForCompletion(true) ? 0 : 1);
}//end of main method
}// end of SalesRedJoin class
____________________________________________
Changed code so the program can run without errors:
package com.nandgama;
import java.io.BufferedReader;
//previous1 import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
//previous1 import java.util.ArrayList;
//import java.util.Collections;
import java.util.HashMap;
//previous1 import java.util.HashSet;
//previous1 import java.util.List;
import java.util.Map;
//previous1 import java.util.Set;
//previous1 import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;
//import org.apache.hadoop.util.StringUtils;
//import com.ibm.icu.util.StringTokenizer;
public class SalesRedJoin {
//previous1
//private Set<String> custInfo= new HashSet<String>();
//previous1 private List<String> custInfo1 = new ArrayList<String>();
//previous1 private Configuration conf1 ;
//previous1 private BufferedReader fis;
public static class SalesRedMapper extends
Mapper<LongWritable, Text,Text,Text> {
private Map<String, String> abMap = new HashMap<String, String>();
//previous1//protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context)
//throws IOException, InterruptedException {
protected void setup(Context context)
throws IOException, InterruptedException {
//open setup function5
//to read customer cache file from hdfs Customer.csv
//previous1
/*
if (context.getCacheFiles() !=null && context.getCacheFiles().length>0)
{
//if block checking of uri1
//
conf1 =context.getConfiguration();
URI[] fileURIs= Job.getInstance(conf1).getCacheFiles();
File custFile = null;
for(URI pathURI :fileURIs){
//Path fPath = new Path(pathURI.getPath());
custFile =new File(pathURI);
}//for block closed
try{
//try block open2
//Reading the cache file and storing in private identifier custInfo1,custInfo
fis =new BufferedReader(new FileReader(custFile));
String line =null;
while((line = fis.readLine())!= null) {
//while open
//previous1
//custInfo1.add(line);
String[] tokens = line.split(",");
String custid1 = tokens[4];
String custname1 = tokens[0]+" "+tokens[1];
abMap.put(custid1, custname1);
// custInfo.add(line);
//while closed
}
fis.close();
//try block closed2
}catch(IOException ioe){
//catch block open3
System.err.println("Caught exception while parsing the cached file '"
);
}//catch block closed3
//if block closed-1
}
super.setup(context);
*/
@SuppressWarnings("deprecation")
Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for (Path p : files) {
if (p.getName().equals("Customer.csv")) {
BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
String line = reader.readLine();
while(line != null) {
String[] tokens = line.split(",");
String custid1 = tokens[4];
String custname1 = tokens[0]+" "+tokens[1];
abMap.put(custid1, custname1);
line = reader.readLine();
}
reader.close();
}
}
if (abMap.isEmpty()) {
throw new IOException("Unable to load Abbrevation data in cache.");
}
//
}//Setup function closed5
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//map function begins
// map function to read Sales.csv
//String name1key = null;
//String valueWr = null;
/*previous1
StringTokenizer str1= new StringTokenizer(value.toString(),",");
for(String s : custInfo1){
//Reading the data in custInfo record by record
String[] custDetail = s.split(",");
String tcusId = custDetail[4];
String tname =custDetail[0]+" "+custDetail[1];
while(str1.hasMoreTokens())
{//start of while loop
//checking if the Sales.csv file for above custId and get the name
if (tcusId.equals(str1.nextToken())){
name1key=tname.toString();
}
}// close of while loop
}//close of for loop
//forming the output key and value pairs with substitute name
*/
String val1[] = value.toString().split(",");
//Text keyValue=null;
Text keyValue = new Text(val1[1].toString());
String custid2=val1[0];
String custname1 = abMap.get(custid2);
String val2 = "("+custname1.toString()+"," + val1[2].toString()+","+val1[3].toString()+")";
//setting product id as key and remaining value with name and other important parameters
context.write((Text) keyValue,new Text(val2.toString()));
}//end of map function
}//end of SalesRedMapper class
public static class ProdRedMapper extends
Mapper<Object, Text, Text, Text> {
//This map function to read Product.csv
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String val2[] = value.toString().split(",");
//Text keyValue=null;
Text keyValue = new Text(val2[4].toString());
String val3=val2[1].toString();
//writing our product number as key and product name as value
context.write((Text)keyValue, new Text(val3.toString()));
}
}
public static class ProdRed extends Reducer<Text,Text,NullWritable,Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws
IOException ,InterruptedException{
//Writing out only values leaving the product number
//String[] s =values.toString();
String tempString=" ";
for(Text s: values){
tempString+=s.toString()+" ";
}
context.write(NullWritable.get(), new Text(tempString.toString()));
//context.write(NullWritable.get(), new Tex(values));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job =Job.getInstance(conf,"Reduce side join" );
job.setJarByClass(SalesRedJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//setting multiple input paths to Sales.csv and Product.csv with map classes
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,SalesRedMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,ProdRedMapper.class);
job.setReducerClass(ProdRed.class);
Path outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
//Adding cache file to each node present in hdfs
try{
DistributedCache.addCacheFile(new URI("hdfs:/user/hadoop/Customer.csv"), job.getConfiguration());
//job.addCacheFile(new URI());
}catch(Exception e){
System.out.println(e);
}
// Submit the job and wait for it to finish.
System.exit(job.waitForCompletion(true) ? 0 : 1);
}//end of main method
}// end of SalesRedJoin class
______________________________
Output file of above program in hdfs
Motherboard H772 CPU (Jansen Gibbons,7,2012-01-24) (Arthur Balgne,2,2012-01-09) (Cliff Brandson,1,2012-01-24) (Banhent Bowden,2,2012-01-09)
Motherboard G822 CPU
Motherboard F991 CPU (Serghert Dexter,1,2012-01-09) (Jean Kulinski,2,2012-01-09)
(Franklin Melcic,2,2012-01-24) (Gerhart Serghert,10,2012-01-24) 500 GB HD Panther Brand
500 GB HD Tiger Brand (Roger Getnet,5,2012-01-24) (Gerhart Serghert,4,2012-01-24)
1 TB HD Jargon Brand
Computer Case Jargon Brand Style 1500 (John Jarkin,2,2012-01-24) (Panelo Wade,3,2012-01-09) (Arthur Panelo,1,2012-01-24) (Merdec Vickers,1,2012-01-09)
(Mello Reynolds,17,2012-01-09) Computer Case Jargon Brand Style 1501
DVD Tiger Brand Internal
DVD Tiger Brand External
16 GB Memory ECC (Lambert Givens,1,2012-01-09) (Lisa Hetzer,1,2012-01-09)
2 GB Memory ECC
4 GB Memory ECC (Jean Jansen,1,2012-01-24)
(Jansen Gibbons,31,2012-01-09) Computer Case Tiger Brand Style 4332
DVD Jargon Brand Internal (Panelo Wade,24,2012-01-09) (Hubert Banhent,7,2012-01-24) (Melcic Lexter,4,2012-01-09) (Panelo Wade,1,2012-01-09) (Lisa Hetzer,1,2012-01-24)
DVD Jargon Brand External
Power Supply Jargon Brand 500 Watts
Power Supply Jargon Brand 300 Watts (Getnet Scott,2,2012-01-09) (Cliff Mello,1,2012-01-24) (Cliff Brandson,15,2012-01-09) (Getnet Scott,3,2012-01-24) (Mello Reynolds,4,2012-01-24)
Power Supply Tiger Brand 300 Watts
Video Card Tiger Brand 84F1 (Allen Jaskobec,1,2012-01-24) (Mello Reynolds,1,2012-01-09) (Elizabeth Ilyenko,1,2012-01-24)
1 Core CPU Jargon Brand 2 GHZ
4 Core CPU Jargon Brand 3 GHZ
2 Core CPU Jargon Brand 2 GHZ
Video Card Jargon Brand 93G (Jean Kulinski,23,2012-01-24) (Ilyenko Merdec,1,2012-01-09) (Roscoe Morris,6,2012-01-09)
(Junielle Josephs,2,2012-01-09) (Banhent Bowden,1,2012-01-24) (Lisa Junielle,2,2012-01-24) Video Card Jargon Brand 84F1
Please let me know what is wrong with the problem statement
ReplyDeleteprogram is running fine
ReplyDelete