Converting Image File to Sequence File Format and Finding Duplicates

Converting Image File to Sequence File Format and Finding Duplicates

package com.nandgama;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.TextInputFormat;
//import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.zookeeper.common.IOUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;



public class ConvImageToSequenceFile {

    /**
     * @param args
     */

    public static class ConvImageToSequenceFileMapper extends
    Mapper<Object, Text,Text,BytesWritable> {
       
        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
                //Reading path from the value passed above in map where the image file is present.       
            String pathToRead = value.toString();
           
            Configuration conf=context.getConfiguration();
            Path path = new Path(pathToRead);   
            //Creating a FileSystem setup from the above path
            FileSystem fileToRead = FileSystem.get(URI.create(pathToRead), conf);
           
            //creating a DataInput stream class where  it reads the file and outputs bytes stream
            DataInputStream dis = null;
            try{   
                //try block open
               
                dis = fileToRead.open(path);
               
                /*Because you don't know the image size that is the reason why we are creating a
                 byte array of max size to read all at once instead of reading single byte
                 to save read time and resources */
                byte tempBuffer[]= new byte[1024 * 1024];
               
                ByteArrayOutputStream bout =new ByteArrayOutputStream();
               
                /*dis variable reads bytes into buffer starting from zero and to max length
                 until end of file which  returns a -1 once it does while breaks */
                while(dis.read(tempBuffer, 0, tempBuffer.length)>= 0)
                {
                    /*if tempBuffer is full and still dis is reading because it didn't receive end
                     of file as -1 is not encountered then the ByteArrayOutputStream bout 
                     need to be added with remaining bytes until end of file  */                   
                    bout.write(tempBuffer);
                                   
                }
                /*why can't we write the below line if dis is done reading before the
                 * buffer size is full */
                //context.write(value,new BytesWritable(tempBuffer.clone()));
               
                //writing the ByteArrayOutputStream bout
                context.write(value,new BytesWritable(bout.toByteArray()));
               
                //try block close
                }finally{
                    //final block open
                   
                    //the difference between dis.close() and IOUtils.closeStream(dis);
                    dis.close();
                    IOUtils.closeStream(dis);                   
                //final block close
                }   
            }//map function close
        }
        //ConvImageToSequenceFileMapper close
   
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "ConvImageToSequenceFile");
            job.setJarByClass(ConvImageToSequenceFile.class);
           
            job.setMapperClass(ConvImageToSequenceFileMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(ByteWritable.class);
           
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
           
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //FileOutputFormat<K, V>.setOutputPath(arg0, arg1)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        //close of driver class       
        }
//close of ConvImageToSequenceFile
}

package com.nandgama;

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;

//import org.apache.hadoop.mapred.FileInputFormat;
//import org.apache.hadoop.mapred.SequenceFileInputFormat;
//import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//check these
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
_____________________________________________________________________
public class ImageDuplicateFinder {

    /**
     * @param args
     */
    public static class ImageDuplicateFinderMapper  extends
    Mapper<Text,BytesWritable,Text,Text> {
       
        @Override
        public void map(Text key,BytesWritable value, Context context)
                throws IOException, InterruptedException {
           
            //the file is in SequentialFileFormat we are reading.
            byte tempImageData[] = value.getBytes();
            String md5Str = null;
            try {
                //better to keep in a try block so that if algorithm doesn't exist then it will throw
                // an exception
                md5Str = convertToStr(tempImageData);
            } catch (NoSuchAlgorithmException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                context.setStatus("Internal Error Can't find the algorithm for the specified " +
                        "digest algorithm MD5 ");
               
                /*if an exception occurs then program ends by the below return statement without
                Executing context.write(new Text(md5Str),key); */
                return ;
            }
            /*here the key is converted hexa decimal which corresponds to each individual image , so that all
             the similar keys are grouped together with their corresponding paths as values.
             */
            context.write(new Text(md5Str),key);
           
            //close of map function
        }
       
       
        public String convertToStr(byte[] passImageData) throws NoSuchAlgorithmException{
           
            /*MessageDigest Class is used to here and we are trying to get "MD5" algorithm .
             Security.Providers() gives a list of Providers and its algorithms */
            MessageDigest md = MessageDigest.getInstance("MD5");
           
            //Updates the digest using the specified array of bytes.           
            md.update(passImageData);
            /*Completes the hash computation by performing final operations such as padding.
            The digest is reset after this call is made.Retruns an array of bytes of resulting
            hash value*/
            byte[] tempHash =md.digest();
           
            String hexString = new String();
            //Below code for Converting Byte array to hex
                for(int i =0; i<tempHash.length; i++){
                    hexString += Integer.toString( (tempHash[i] & 0xff) + 0x100, 16 ).substring(1);
                }
            ////the similar images will have same hexString
            return hexString  ;
        }//close of convertToStr function
       
        //close of ImageDuplicateFinderMapper
    }
   
    public static class ImageDuplicateFinderReducer extends
    Reducer<Text,Text,Text,Text> {
   
        public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,
        InterruptedException{
            Text imagePath = null;
            for(Text tempPath : values)
            {
                /*interested only in the first value for the given key(i.e is the path of image)
                 * where all the remaining path in value  are duplicates*/
                imagePath = tempPath;
                return;
            }
            context.write(new Text(imagePath) , key);
        }//close of reduce function ImageDuplicateFinderReducer
       
   
    }//close of reducer ImageDuplicateFinderReducer class
   
    public static void main(String[] args) throws Exception  {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
       
        String[] programArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (programArgs.length != 2) {
               System.err.println("Usage: ImageDuplicateFinder <in>  <out>");
               System.exit(2);
            }
            //Job job = new Job(conf, "Reduce side Join");
            Job job =Job.getInstance(conf,"ImageDuplicateFinder" );
           
            job.setJarByClass(ImageDuplicateFinder.class);
            job.setMapperClass(ImageDuplicateFinderMapper.class);
            job.setReducerClass(ImageDuplicateFinderReducer.class);
           
            /*Are these two necessary because we have a reducer class already defined which output is
             Text and Text by default */
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
           
           
                //do we have to set these two just beacause we are giving SequenceFileInputFormat file
                job.setInputFormatClass(SequenceFileInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
           
           
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
                   
        //close of driver class
        }
//close of ImageDulplicateFinder
}
___________________________
This is where i stores images in hdfs
/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
____________________________________________________________________________
This is where i stored the image paths in the file named  imagepath.txt on hdfs

imagepath.txt file content

/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
______________
In hadoop command where converting images to sequencefile format

hadoop jar ConvImageToSequenceFile.jar ConvImageToSequenceFile  /user/mr/imagepath.txt  /user/mr/sequencefileout
________________________________________
Hadoop command for finding duplicates

hadoop jar ImageDuplicateFinder.jar ImageDuplicateFinder  /user/mr/sequencefileout/part-r-00000    user/mr/nonimageduplicates

1 comment:

  1. This example worked fine , just write after system.exit(job.waitForCompletion() );logic FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    ______________________________
    for(Text tempPath : values)
    {
    /*interested only in the first value for the given key(i.e is the path of image)
    * where all the remaining path in value are duplicates*/
    imagePath = tempPath;
    return;
    }

    In the above logic Instead of return replace it with a break statement .
    __________________
    The input file to second program ImageDuplicateFinder >>>should be ouptut of first program which be a part-r-00000 in hdfs of first program ConvImageToSequenceFile

    ReplyDelete