Analyzing Twitter Data from FLUME data file.

 Analyzing Twitter Data from FLUME data file.

------------------------
Please check the original documentation , this code is just to understand the flow to have an idea. The implementation varies based on several factors
Go to :/usr/lib/flume-ng/apache-flume-1.4.0-bin

From flume installation directory  please execute the flume.conf file to get the result data set from twitter according to your problem statement.

./bin/flume-ng agent --name agent1 --conf conf  -f  /home/biadmin/flume.conf  -Dflume.root.logger=INFO,console. 

agent1 is the name of agent in the flume.conf  properties file.
/home/biadmin/flume.conf path where  the properties file is.
>vi flume.conf  ( write the agent1 properties which is basically source,sink,channel)

-D flume.root.logger =INFO,console is routing the logs info to the console .
_____
Once the files are written in HDFS with the sink configuration path given in flume.conf file you can cat them with  Hdfs dfs -cat /user/flume.34350990909 
You can see the file flume.34350990909 is in json format that is streamed from twitter.
______
To read json format record format , just copy one single record from flume.34350990909 file 
Go to jaql installation directory  

Change to the Jaql bin directory.
cd $HADOOP_HOME/jaql/bin


Start the Jaql shell.
>./jaqlshell

then you will see 
jaql>arecord = [..paste the record copied from flume.34350990909 file];
jaql>arecord ; will print the record clearly with indentation 

To see members of the record execute below command
Jaql>names(arecord);  will print members names of the Json format in file flume.34350990909 
Compare above two outputs you will get a idea of the records
______
Next depending go to hive shell
change to $HIVE_HOME/bin 
and execute the commnad 
$HIVE_HOME/bin >./hive
hive>   
In hive shell read the file directly to hive table.

hive>

DROP TABLE raw_tweets_data;
hive>CREATE EXTERNAL TABLE raw_tweets_data (
    json_record STRING) ;
hive> load data local inpath '.../path where the data is present in linux directory'
into raw_tweets_data;
hive> select count(*) from raw_tweets_data ; will give you the no of records in the 
raw_tweets_data;
 
so depending on the output of jaql> names(arecord) create a table in hive
hive>
DROP TABLE tweetsproblemstat1;
CREATE TABLE tweetsproblemstat1
(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array<STRING>,
    user_mentions array<STRING>,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING
);
___________________________ 
 Now loading the data into hive table:
 FROM raw_tweets_data
INSERT OVERWRITE TABLE tweetsproblemstat1
SELECT
    cast(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
    substr (get_json_object(json_response, '$.created_at'),27,4)),
    substr (get_json_object(json_response, '$.created_at'),27,4),
    case substr (get_json_object(json_response, '$.created_at'),5,3)
        when "Jan" then "01"
        when "Feb" then "02"
        when "Mar" then "03"
        when "Apr" then "04"
        when "May" then "05"
        when "Jun" then "06"
        when "Jul" then "07"
        when "Aug" then "08"
        when "Sep" then "09"
        when "Oct" then "10"
        when "Nov" then "11"
        when "Dec" then "12" end,
    substr (get_json_object(json_response, '$.created_at'),9,2),
    substr (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    cast (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    array(
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    array(
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    cast (get_json_object(json_response, '$.user.followers_count') as INT),
    cast (get_json_object(json_response, '$.user.listed_count') as INT),
    cast (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (length(json_response) > 0);
 
____
Now count the number of records in the table  
tweetsproblemstat1
 
hive>select count(*) from  
tweetsproblemstat1;
 
To know the fields in hive table 
hive> describe table  
tweetsproblemstat1;
 
 
 
 


Converting Image File to Sequence File Format and Finding Duplicates

Converting Image File to Sequence File Format and Finding Duplicates

package com.nandgama;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.TextInputFormat;
//import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.zookeeper.common.IOUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;



public class ConvImageToSequenceFile {

    /**
     * @param args
     */

    public static class ConvImageToSequenceFileMapper extends
    Mapper<Object, Text,Text,BytesWritable> {
       
        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
                //Reading path from the value passed above in map where the image file is present.       
            String pathToRead = value.toString();
           
            Configuration conf=context.getConfiguration();
            Path path = new Path(pathToRead);   
            //Creating a FileSystem setup from the above path
            FileSystem fileToRead = FileSystem.get(URI.create(pathToRead), conf);
           
            //creating a DataInput stream class where  it reads the file and outputs bytes stream
            DataInputStream dis = null;
            try{   
                //try block open
               
                dis = fileToRead.open(path);
               
                /*Because you don't know the image size that is the reason why we are creating a
                 byte array of max size to read all at once instead of reading single byte
                 to save read time and resources */
                byte tempBuffer[]= new byte[1024 * 1024];
               
                ByteArrayOutputStream bout =new ByteArrayOutputStream();
               
                /*dis variable reads bytes into buffer starting from zero and to max length
                 until end of file which  returns a -1 once it does while breaks */
                while(dis.read(tempBuffer, 0, tempBuffer.length)>= 0)
                {
                    /*if tempBuffer is full and still dis is reading because it didn't receive end
                     of file as -1 is not encountered then the ByteArrayOutputStream bout 
                     need to be added with remaining bytes until end of file  */                   
                    bout.write(tempBuffer);
                                   
                }
                /*why can't we write the below line if dis is done reading before the
                 * buffer size is full */
                //context.write(value,new BytesWritable(tempBuffer.clone()));
               
                //writing the ByteArrayOutputStream bout
                context.write(value,new BytesWritable(bout.toByteArray()));
               
                //try block close
                }finally{
                    //final block open
                   
                    //the difference between dis.close() and IOUtils.closeStream(dis);
                    dis.close();
                    IOUtils.closeStream(dis);                   
                //final block close
                }   
            }//map function close
        }
        //ConvImageToSequenceFileMapper close
   
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "ConvImageToSequenceFile");
            job.setJarByClass(ConvImageToSequenceFile.class);
           
            job.setMapperClass(ConvImageToSequenceFileMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(ByteWritable.class);
           
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
           
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //FileOutputFormat<K, V>.setOutputPath(arg0, arg1)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        //close of driver class       
        }
//close of ConvImageToSequenceFile
}

package com.nandgama;

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;

//import org.apache.hadoop.mapred.FileInputFormat;
//import org.apache.hadoop.mapred.SequenceFileInputFormat;
//import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//check these
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
_____________________________________________________________________
public class ImageDuplicateFinder {

    /**
     * @param args
     */
    public static class ImageDuplicateFinderMapper  extends
    Mapper<Text,BytesWritable,Text,Text> {
       
        @Override
        public void map(Text key,BytesWritable value, Context context)
                throws IOException, InterruptedException {
           
            //the file is in SequentialFileFormat we are reading.
            byte tempImageData[] = value.getBytes();
            String md5Str = null;
            try {
                //better to keep in a try block so that if algorithm doesn't exist then it will throw
                // an exception
                md5Str = convertToStr(tempImageData);
            } catch (NoSuchAlgorithmException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                context.setStatus("Internal Error Can't find the algorithm for the specified " +
                        "digest algorithm MD5 ");
               
                /*if an exception occurs then program ends by the below return statement without
                Executing context.write(new Text(md5Str),key); */
                return ;
            }
            /*here the key is converted hexa decimal which corresponds to each individual image , so that all
             the similar keys are grouped together with their corresponding paths as values.
             */
            context.write(new Text(md5Str),key);
           
            //close of map function
        }
       
       
        public String convertToStr(byte[] passImageData) throws NoSuchAlgorithmException{
           
            /*MessageDigest Class is used to here and we are trying to get "MD5" algorithm .
             Security.Providers() gives a list of Providers and its algorithms */
            MessageDigest md = MessageDigest.getInstance("MD5");
           
            //Updates the digest using the specified array of bytes.           
            md.update(passImageData);
            /*Completes the hash computation by performing final operations such as padding.
            The digest is reset after this call is made.Retruns an array of bytes of resulting
            hash value*/
            byte[] tempHash =md.digest();
           
            String hexString = new String();
            //Below code for Converting Byte array to hex
                for(int i =0; i<tempHash.length; i++){
                    hexString += Integer.toString( (tempHash[i] & 0xff) + 0x100, 16 ).substring(1);
                }
            ////the similar images will have same hexString
            return hexString  ;
        }//close of convertToStr function
       
        //close of ImageDuplicateFinderMapper
    }
   
    public static class ImageDuplicateFinderReducer extends
    Reducer<Text,Text,Text,Text> {
   
        public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,
        InterruptedException{
            Text imagePath = null;
            for(Text tempPath : values)
            {
                /*interested only in the first value for the given key(i.e is the path of image)
                 * where all the remaining path in value  are duplicates*/
                imagePath = tempPath;
                return;
            }
            context.write(new Text(imagePath) , key);
        }//close of reduce function ImageDuplicateFinderReducer
       
   
    }//close of reducer ImageDuplicateFinderReducer class
   
    public static void main(String[] args) throws Exception  {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
       
        String[] programArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (programArgs.length != 2) {
               System.err.println("Usage: ImageDuplicateFinder <in>  <out>");
               System.exit(2);
            }
            //Job job = new Job(conf, "Reduce side Join");
            Job job =Job.getInstance(conf,"ImageDuplicateFinder" );
           
            job.setJarByClass(ImageDuplicateFinder.class);
            job.setMapperClass(ImageDuplicateFinderMapper.class);
            job.setReducerClass(ImageDuplicateFinderReducer.class);
           
            /*Are these two necessary because we have a reducer class already defined which output is
             Text and Text by default */
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
           
           
                //do we have to set these two just beacause we are giving SequenceFileInputFormat file
                job.setInputFormatClass(SequenceFileInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
           
           
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
                   
        //close of driver class
        }
//close of ImageDulplicateFinder
}
___________________________
This is where i stores images in hdfs
/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
____________________________________________________________________________
This is where i stored the image paths in the file named  imagepath.txt on hdfs

imagepath.txt file content

/user/mr/Images/Ballons.jpeg
/user/mr/Images/Bird.jpeg
/user/mr/Images/BirdWings.jpeg
/user/mr/Images/BlueBird.jpeg
/user/mr/Images/Flower.jpeg
/user/mr/Images/Giraffe.jpeg
/user/mr/Images/HangingBridge.jpeg
/user/mr/Images/RiverSwan.jpeg
/user/mr/Images/Sunflower.jpeg
/user/mr/Images/Sunset.jpeg
/user/mr/Images/Swan.jpeg
/user/mr/Images/Tortoise.jpeg
/user/mr/Images/Wildtiger.jpeg
/user/mr/Images/bridge.jpeg
/user/mr/Images/tiger.jpeg
______________
In hadoop command where converting images to sequencefile format

hadoop jar ConvImageToSequenceFile.jar ConvImageToSequenceFile  /user/mr/imagepath.txt  /user/mr/sequencefileout
________________________________________
Hadoop command for finding duplicates

hadoop jar ImageDuplicateFinder.jar ImageDuplicateFinder  /user/mr/sequencefileout/part-r-00000    user/mr/nonimageduplicates

A Reduce side join example.

A Reduce side join example (  A MapReduce example )