A Reduce side join example.

A Reduce side join example (  A MapReduce example )

Using three file Sales.csv, Customer.csv, Product.csv
A data files of above three *.csv files
 https://drive.google.com/file/d/0B3DvJosZ_aAlN1plekRRdHlnMXM/view?usp=sharing
 https://drive.google.com/file/d/0B3DvJosZ_aAlRm00MGdSNzdYcFk/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlSFYxeVVCY0p6NlE/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlaDE4WTJHd0JXT3c/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlbXFUWHVZNmZab28/view?usp=sharing
https://drive.google.com/file/d/0B3DvJosZ_aAlal9SWUQtNEtTMWc/view?usp=sharing
________________________________________
Problem Statement is : Sales.csv has customer-id and product-id
Take the customer-id from Customer.csv and product-id from Product.csv and add it to Sales
From Sales.csv display the final result as Product name : customer  name   +quantity+date

My approach 
Step1:I took Customer.csv file in cache
Step2:I read Sales.csv into mapper , and for every cust-id in sales i replaced with customer name(read from Customer.csv in cache) with output as (product id as key) and remaining fields of    (customername+quantity+date as values)
Step3: In another mapper i read product id as key and product name as value(here if you notice product-id is key in both mappers) from file Product.csv
Step4:In both mappers  product-id is  key
Step5: In Main class i used MultipleInputs and as the output is sorted on keys on Product-id combining product name and who bought them
step6: left the key out as NullWritable and only wrote the values out
____________________________________
package com.nandgama.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
//import org.apache.hadoop.util.StringUtils;
//import com.ibm.icu.util.StringTokenizer;

public class SalesRedJoin {
   
    private Set<String> custInfo= new HashSet<String>();
    private List<String> custInfo1 = new ArrayList<String>();
   
    private Configuration conf ;
    private BufferedReader fis;
   
    public class SalesRedMapper extends
    Mapper<Object, Text, Text, Text> {

        protected void setup(Mapper<Object,Text,Text,Text>.Context context)
                throws IOException, InterruptedException {
                    //open setup function5
                    //to read customer cache file from hdfs Customer.csv
            if (context.getCacheFiles() !=null && context.getCacheFiles().length>0)
                { 
                    //if block checking of uri1
                    //
                        conf =context.getConfiguration();
                        URI[] fileURIs= Job.getInstance(conf).getCacheFiles();
                        File custFile = null;
                        for(URI pathURI :fileURIs){
                        //Path fPath = new Path(pathURI.getPath());
                        custFile =new File(pathURI);        
                            }//for block closed       
                    try{   
                        //try block open2
                        //Reading the cache file and storing in private identifier custInfo1,custInfo
                        fis =new BufferedReader(new FileReader(custFile));
                        String line =null;
                        while((line = fis.readLine())!= null) {
                            //while open
                            custInfo1.add(line);
                            custInfo.add(line);
                            //while closed
                        }   
                            fis.close();
                        //try block closed2
                        }catch(IOException ioe){
                            //catch block open3
                            System.err.println("Caught exception while parsing the cached file '"
                            );
               
                        }//catch block closed3
                 //if block closed-1
                }
            super.setup(context);
        }//Setup function closed5      
       
            @Override
            public void map(Object key, Text value, Context context)
   
                    throws IOException, InterruptedException {
                //map function begins       
                // map function to read Sales.csv
                String name1key = null;
                //String valueWr = null;
                StringTokenizer str1= new StringTokenizer(value.toString(),",");
                for(String s : custInfo){
                    //Reading the data in custInfo record by record
                    String[] custDetail = s.split(",");
                    String tcusId = custDetail[4];
                    String tname =custDetail[0]+" "+custDetail[1];
                    while(str1.hasMoreTokens())
                     {//start of while loop
                         //checking if the Sales.csv file for above custId and get the name
                         if (tcusId.equals(str1.nextToken())){                           
                             name1key=tname.toString();                               
                         }
                     }// close of while loop
                   
                    }//close of for loop
                    //forming the output key and value pairs with substitute name
                String val1[] = value.toString().split(",");
                 key = new Text(val1[1].toString());
                 String val2 = "("+name1key.toString()+"," + val1[2].toString()+","+val1[3].toString()+")";
                 //setting product id as key and remaining value with name and other important parameters
                 context.write((Text) key,new Text(val2.toString()));               
                }//end of map function
}//end of SalesRedMapper class
   
            public class ProdRedMapper extends
                        Mapper<Object, Text, Text, Text> {
                //This map function to read Product.csv
                @Override
                public void map(Object key, Text value, Context context)
       
                        throws IOException, InterruptedException {
                    String val2[] = value.toString().split(",");
                    key = new Text(val2[5].toString());
                    String val3=val2[0].toString();
                    //writing our product number as key and product name as value
                    context.write((Text)key, new Text(val3.toString()));                   
                }               
            }

            public static class ProdRed extends Reducer<Text,Text,NullWritable,Text> {
                public void reduce(Text key, Iterable<Text> values,Context context) throws
                IOException ,InterruptedException{                   
                    //Writing out only values leaving the product number
                    context.write(NullWritable.get(), (Text) values);
                    }           
            }

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
           
            String[] programArgs = new GenericOptionsParser(conf, args)
                    .getRemainingArgs();
            if (programArgs.length != 3) {
                   System.err.println("Usage: SalesRedJoin <in> <in> <out>");
                   System.exit(2);
                }
            Job job = new Job(conf, "Reduce side Join");
            job.setJarByClass(SalesRedJoin.class);
            job.setReducerClass(ProdRed.class);       
           
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
           
            //setting multiple input paths to Sales.csv and Product.csv with map classes
            MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,SalesRedMapper.class);
            MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,ProdRedMapper.class);
            Path outputPath = new Path(args[2]);
                   
            FileOutputFormat.setOutputPath(job, outputPath);
            //Adding cache file to each node present in hdfs
            job.addCacheFile(new URI("/user/biadmin/TempData/Customer.csv#Customers"));
            // Submit the job and wait for it to finish.
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            }//end of main method

    }// end of SalesRedJoin class
____________________________________________
Changed code so the program can run without errors:

package com.nandgama;


import java.io.BufferedReader;
//previous1 import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
//previous1 import java.util.ArrayList;
//import java.util.Collections;
import java.util.HashMap;
//previous1 import java.util.HashSet;
//previous1 import java.util.List;
import java.util.Map;
//previous1 import java.util.Set;
//previous1 import java.util.StringTokenizer;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser;
//import org.apache.hadoop.util.StringUtils;

//import com.ibm.icu.util.StringTokenizer;



public class SalesRedJoin {
    //previous1
    //private Set<String> custInfo= new HashSet<String>();
    //previous1 private List<String> custInfo1 = new ArrayList<String>();
   
   
    //previous1 private Configuration conf1 ;
    //previous1 private BufferedReader fis;
   
    public static class SalesRedMapper extends
    Mapper<LongWritable, Text,Text,Text> {
       
       
        private Map<String, String> abMap = new HashMap<String, String>();
        //previous1//protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context)
                //throws IOException, InterruptedException {
           
            protected void setup(Context context)
            throws IOException, InterruptedException {
                    //open setup function5
                    //to read customer cache file from hdfs Customer.csv
           
               
            //previous1   
            /*   
                if (context.getCacheFiles() !=null && context.getCacheFiles().length>0)
                { 
                    //if block checking of uri1
                    //
                        conf1 =context.getConfiguration();
                        URI[] fileURIs= Job.getInstance(conf1).getCacheFiles();
                        File custFile = null;
                        for(URI pathURI :fileURIs){
                        //Path fPath = new Path(pathURI.getPath());
                        custFile =new File(pathURI);        
                            }//for block closed       
                    try{   
                        //try block open2
                        //Reading the cache file and storing in private identifier custInfo1,custInfo
                        fis =new BufferedReader(new FileReader(custFile));
                        String line =null;
                        while((line = fis.readLine())!= null) {
                            //while open
                           
                            //previous1
                            //custInfo1.add(line);
                           
                            String[] tokens = line.split(",");
                            String custid1 = tokens[4];
                            String custname1 = tokens[0]+" "+tokens[1];
                            abMap.put(custid1, custname1);
                        //    custInfo.add(line);
                            //while closed
                        }   
                            fis.close();
                        //try block closed2
                        }catch(IOException ioe){
                            //catch block open3
                            System.err.println("Caught exception while parsing the cached file '"
                            );
               
                        }//catch block closed3
                 //if block closed-1
                }
           
            super.setup(context);
           
            */
                @SuppressWarnings("deprecation")
                Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
               
               
                for (Path p : files) {
                    if (p.getName().equals("Customer.csv")) {
                        BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
                        String line = reader.readLine();
                   
                        while(line != null) {
                            String[] tokens = line.split(",");
                            String custid1 = tokens[4];
                            String custname1 = tokens[0]+" "+tokens[1];
                            abMap.put(custid1, custname1);
                            line = reader.readLine();
                        }
                        reader.close();
                    }
                }
                if (abMap.isEmpty()) {
                    throw new IOException("Unable to load Abbrevation data in cache.");
                }
            //
        }//Setup function closed5
       
       
            @Override
            public void map(LongWritable key, Text value, Context context)
   
                    throws IOException, InterruptedException {
                //map function begins       
                // map function to read Sales.csv
                //String name1key = null;
                //String valueWr = null;
               
                /*previous1
                StringTokenizer str1= new StringTokenizer(value.toString(),",");
                for(String s : custInfo1){
                    //Reading the data in custInfo record by record
                    String[] custDetail = s.split(",");
                    String tcusId = custDetail[4];
                    String tname =custDetail[0]+" "+custDetail[1];
                    while(str1.hasMoreTokens())
                     {//start of while loop
                         //checking if the Sales.csv file for above custId and get the name
                         if (tcusId.equals(str1.nextToken())){                           
                             name1key=tname.toString();                               
                         }
                     }// close of while loop
                   
                    }//close of for loop
                    //forming the output key and value pairs with substitute name
                     */
                String val1[] = value.toString().split(",");
                //Text keyValue=null;
                 Text keyValue = new Text(val1[1].toString());
                  String custid2=val1[0];
                 String custname1 = abMap.get(custid2);
                  String val2 = "("+custname1.toString()+"," + val1[2].toString()+","+val1[3].toString()+")";
                 //setting product id as key and remaining value with name and other important parameters
                 context.write((Text) keyValue,new Text(val2.toString()));               
                }//end of map function
}//end of SalesRedMapper class
   
            public static class ProdRedMapper extends
                        Mapper<Object, Text, Text, Text> {
                //This map function to read Product.csv
                @Override
                public void map(Object key, Text value, Context context)
       
                        throws IOException, InterruptedException {
                    String val2[] = value.toString().split(",");
                    //Text keyValue=null;
                     Text keyValue = new Text(val2[4].toString());
                    String val3=val2[1].toString();
                    //writing our product number as key and product name as value
                    context.write((Text)keyValue, new Text(val3.toString()));                   
                }               
            }

            public static class ProdRed extends Reducer<Text,Text,NullWritable,Text> {
               
                public void reduce(Text key, Iterable<Text> values,Context context) throws
                IOException ,InterruptedException{                   
                    //Writing out only values leaving the product number
                    //String[] s =values.toString();
                    String tempString=" ";
                    for(Text s: values){
                       
                        tempString+=s.toString()+" ";
                    }
                   
                    context.write(NullWritable.get(), new Text(tempString.toString()));
                    //context.write(NullWritable.get(), new Tex(values));
                    }           
            }

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
           
            Job job =Job.getInstance(conf,"Reduce side join" );
            job.setJarByClass(SalesRedJoin.class);
               
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
           
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
           
            //setting multiple input paths to Sales.csv and Product.csv with map classes
            MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,SalesRedMapper.class);
            MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,ProdRedMapper.class);
            job.setReducerClass(ProdRed.class);   
            Path outputPath = new Path(args[2]);
                   
            FileOutputFormat.setOutputPath(job, outputPath);
            //Adding cache file to each node present in hdfs
            try{
                DistributedCache.addCacheFile(new URI("hdfs:/user/hadoop/Customer.csv"), job.getConfiguration());
                //job.addCacheFile(new URI());
            }catch(Exception e){
                System.out.println(e);
            }
           
            // Submit the job and wait for it to finish.
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            }//end of main method

    }// end of SalesRedJoin class
______________________________
Output file of above program in hdfs

 Motherboard H772 CPU (Jansen Gibbons,7,2012-01-24) (Arthur Balgne,2,2012-01-09) (Cliff Brandson,1,2012-01-24) (Banhent Bowden,2,2012-01-09)
 Motherboard G822 CPU
 Motherboard F991 CPU (Serghert Dexter,1,2012-01-09) (Jean Kulinski,2,2012-01-09)
 (Franklin Melcic,2,2012-01-24) (Gerhart Serghert,10,2012-01-24) 500 GB HD Panther Brand
 500 GB HD Tiger Brand (Roger Getnet,5,2012-01-24) (Gerhart Serghert,4,2012-01-24)
 1 TB HD Jargon Brand
 Computer Case Jargon Brand Style 1500 (John Jarkin,2,2012-01-24) (Panelo Wade,3,2012-01-09) (Arthur Panelo,1,2012-01-24) (Merdec Vickers,1,2012-01-09)
 (Mello Reynolds,17,2012-01-09) Computer Case Jargon Brand Style 1501
 DVD Tiger Brand Internal
 DVD Tiger Brand External
 16 GB Memory ECC (Lambert Givens,1,2012-01-09) (Lisa Hetzer,1,2012-01-09)
 2 GB Memory ECC
 4 GB Memory ECC (Jean Jansen,1,2012-01-24)
 (Jansen Gibbons,31,2012-01-09) Computer Case Tiger Brand Style 4332
 DVD Jargon Brand Internal (Panelo Wade,24,2012-01-09) (Hubert Banhent,7,2012-01-24) (Melcic Lexter,4,2012-01-09) (Panelo Wade,1,2012-01-09) (Lisa Hetzer,1,2012-01-24)
 DVD Jargon Brand External
 Power Supply Jargon Brand 500 Watts
 Power Supply Jargon Brand 300 Watts (Getnet Scott,2,2012-01-09) (Cliff Mello,1,2012-01-24) (Cliff Brandson,15,2012-01-09) (Getnet Scott,3,2012-01-24) (Mello Reynolds,4,2012-01-24)
 Power Supply Tiger Brand 300 Watts
 Video Card Tiger Brand 84F1 (Allen Jaskobec,1,2012-01-24) (Mello Reynolds,1,2012-01-09) (Elizabeth Ilyenko,1,2012-01-24)
 1 Core CPU Jargon Brand 2  GHZ 
 4 Core CPU Jargon Brand 3 GHZ 
 2 Core CPU Jargon Brand 2  GHZ 
 Video Card Jargon Brand 93G (Jean Kulinski,23,2012-01-24) (Ilyenko Merdec,1,2012-01-09) (Roscoe Morris,6,2012-01-09)
 (Junielle Josephs,2,2012-01-09) (Banhent Bowden,1,2012-01-24) (Lisa Junielle,2,2012-01-24) Video Card Jargon Brand 84F1

2 comments:

  1. Please let me know what is wrong with the problem statement

    ReplyDelete
  2. program is running fine

    ReplyDelete