Converting Image File to Sequence File Format and Finding Duplicates
package com.nandgama;import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.TextInputFormat;
//import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.zookeeper.common.IOUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class ConvImageToSequenceFile {
/**
* @param args
*/
public static class ConvImageToSequenceFileMapper extends
Mapper<Object, Text,Text,BytesWritable> {
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//Reading path from the value passed above in map where the image file is present.
String pathToRead = value.toString();
Configuration conf=context.getConfiguration();
Path path = new Path(pathToRead);
//Creating a FileSystem setup from the above path
FileSystem fileToRead = FileSystem.get(URI.create(pathToRead), conf);
//creating a DataInput stream class where it reads the file and outputs bytes stream
DataInputStream dis = null;
try{
//try block open
dis = fileToRead.open(path);
/*Because you don't know the image size that is the reason why we are creating a
byte array of max size to read all at once instead of reading single byte
to save read time and resources */
byte tempBuffer[]= new byte[1024 * 1024];
ByteArrayOutputStream bout =new ByteArrayOutputStream();
/*dis variable reads bytes into buffer starting from zero and to max length
until end of file which returns a -1 once it does while breaks */
while(dis.read(tempBuffer, 0, tempBuffer.length)>= 0)
{
/*if tempBuffer is full and still dis is reading because it didn't receive end
of file as -1 is not encountered then the ByteArrayOutputStream bout
need to be added with remaining bytes until end of file */
bout.write(tempBuffer);
}
/*why can't we write the below line if dis is done reading before the
* buffer size is full */
//context.write(value,new BytesWritable(tempBuffer.clone()));
//writing the ByteArrayOutputStream bout
context.write(value,new BytesWritable(bout.toByteArray()));
//try block close
}finally{
//final block open
//the difference between dis.close() and IOUtils.closeStream(dis);
dis.close();
IOUtils.closeStream(dis);
//final block close
}
}//map function close
}
//ConvImageToSequenceFileMapper close
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ConvImageToSequenceFile");
job.setJarByClass(ConvImageToSequenceFile.class);
job.setMapperClass(ConvImageToSequenceFileMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ByteWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//FileOutputFormat<K, V>.setOutputPath(arg0, arg1)
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//close of driver class
}
//close of ConvImageToSequenceFile
}
package com.nandgama;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.FileInputFormat;
//import org.apache.hadoop.mapred.SequenceFileInputFormat;
//import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//check these
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
_____________________________________________________________________
public class ImageDuplicateFinder {
/**
* @param args
*/
public static class ImageDuplicateFinderMapper extends
Mapper<Text,BytesWritable,Text,Text> {
@Override
public void map(Text key,BytesWritable value, Context context)
throws IOException, InterruptedException {
//the file is in SequentialFileFormat we are reading.
byte tempImageData[] = value.getBytes();
String md5Str = null;
try {
//better to keep in a try block so that if algorithm doesn't exist then it will throw
// an exception
md5Str = convertToStr(tempImageData);
} catch (NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
context.setStatus("Internal Error Can't find the algorithm for the specified " +
"digest algorithm MD5 ");
/*if an exception occurs then program ends by the below return statement without
Executing context.write(new Text(md5Str),key); */
return ;
}
/*here the key is converted hexa decimal which corresponds to each individual image , so that all
the similar keys are grouped together with their corresponding paths as values.
*/
context.write(new Text(md5Str),key);
//close of map function
}
public String convertToStr(byte[] passImageData) throws NoSuchAlgorithmException{
/*MessageDigest Class is used to here and we are trying to get "MD5" algorithm .
Security.Providers() gives a list of Providers and its algorithms */
MessageDigest md = MessageDigest.getInstance("MD5");
//Updates the digest using the specified array of bytes.
md.update(passImageData);
/*Completes the hash computation by performing final operations such as padding.
The digest is reset after this call is made.Retruns an array of bytes of resulting
hash value*/
byte[] tempHash =md.digest();
String hexString = new String();
//Below code for Converting Byte array to hex
for(int i =0; i<tempHash.length; i++){
hexString += Integer.toString( (tempHash[i] & 0xff) + 0x100, 16 ).substring(1);
}
////the similar images will have same hexString
return hexString ;
}//close of convertToStr function
//close of ImageDuplicateFinderMapper
}
public static class ImageDuplicateFinderReducer extends
Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,
InterruptedException{
Text imagePath = null;
for(Text tempPath : values)
{
/*interested only in the first value for the given key(i.e is the path of image)
* where all the remaining path in value are duplicates*/
imagePath = tempPath;
return;
}
context.write(new Text(imagePath) , key);
}//close of reduce function ImageDuplicateFinderReducer
}//close of reducer ImageDuplicateFinderReducer class
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] programArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (programArgs.length != 2) {
System.err.println("Usage: ImageDuplicateFinder <in> <out>");
System.exit(2);
}
//Job job = new Job(conf, "Reduce side Join");
Job job =Job.getInstance(conf,"ImageDuplicateFinder" );
job.setJarByClass(ImageDuplicateFinder.class);
job.setMapperClass(ImageDuplicateFinderMapper.class);
job.setReducerClass(ImageDuplicateFinderReducer.class);
/*Are these two necessary because we have a reducer class already defined which output is
Text and Text by default */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//do we have to set these two just beacause we are giving SequenceFileInputFormat file
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//close of driver class
}
//close of ImageDulplicateFinder
}
___________________________
This is where i stores images in hdfs
/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
____________________________________________________________________________
This is where i stored the image paths in the file named imagepath.txt on hdfs
imagepath.txt file content
/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
______________
In hadoop command where converting images to sequencefile format
hadoop jar ConvImageToSequenceFile.jar ConvImageToSequenceFile /user/mr/imagepath.txt /user/mr/sequencefileout
________________________________________
Hadoop command for finding duplicates
hadoop jar ImageDuplicateFinder.jar ImageDuplicateFinder /user/mr/sequencefileout/part-r-00000 user/mr/nonimageduplicates
This example worked fine , just write after system.exit(job.waitForCompletion() );logic FileInputFormat.addInputPath(job, new Path(args[0]));
ReplyDeleteFileOutputFormat.setOutputPath(job, new Path(args[1]));
______________________________
for(Text tempPath : values)
{
/*interested only in the first value for the given key(i.e is the path of image)
* where all the remaining path in value are duplicates*/
imagePath = tempPath;
return;
}
In the above logic Instead of return replace it with a break statement .
__________________
The input file to second program ImageDuplicateFinder >>>should be ouptut of first program which be a part-r-00000 in hdfs of first program ConvImageToSequenceFile