Analyzing Twitter Data from FLUME data file.

 Analyzing Twitter Data from FLUME data file.

------------------------
Please check the original documentation , this code is just to understand the flow to have an idea. The implementation varies based on several factors
Go to :/usr/lib/flume-ng/apache-flume-1.4.0-bin

From flume installation directory  please execute the flume.conf file to get the result data set from twitter according to your problem statement.

./bin/flume-ng agent --name agent1 --conf conf  -f  /home/biadmin/flume.conf  -Dflume.root.logger=INFO,console. 

agent1 is the name of agent in the flume.conf  properties file.
/home/biadmin/flume.conf path where  the properties file is.
>vi flume.conf  ( write the agent1 properties which is basically source,sink,channel)

-D flume.root.logger =INFO,console is routing the logs info to the console .
_____
Once the files are written in HDFS with the sink configuration path given in flume.conf file you can cat them with  Hdfs dfs -cat /user/flume.34350990909 
You can see the file flume.34350990909 is in json format that is streamed from twitter.
______
To read json format record format , just copy one single record from flume.34350990909 file 
Go to jaql installation directory  

Change to the Jaql bin directory.
cd $HADOOP_HOME/jaql/bin


Start the Jaql shell.
>./jaqlshell

then you will see 
jaql>arecord = [..paste the record copied from flume.34350990909 file];
jaql>arecord ; will print the record clearly with indentation 

To see members of the record execute below command
Jaql>names(arecord);  will print members names of the Json format in file flume.34350990909 
Compare above two outputs you will get a idea of the records
______
Next depending go to hive shell
change to $HIVE_HOME/bin 
and execute the commnad 
$HIVE_HOME/bin >./hive
hive>   
In hive shell read the file directly to hive table.

hive>

DROP TABLE raw_tweets_data;
hive>CREATE EXTERNAL TABLE raw_tweets_data (
    json_record STRING) ;
hive> load data local inpath '.../path where the data is present in linux directory'
into raw_tweets_data;
hive> select count(*) from raw_tweets_data ; will give you the no of records in the 
raw_tweets_data;
 
so depending on the output of jaql> names(arecord) create a table in hive
hive>
DROP TABLE tweetsproblemstat1;
CREATE TABLE tweetsproblemstat1
(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array<STRING>,
    user_mentions array<STRING>,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING
);
___________________________ 
 Now loading the data into hive table:
 FROM raw_tweets_data
INSERT OVERWRITE TABLE tweetsproblemstat1
SELECT
    cast(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
    substr (get_json_object(json_response, '$.created_at'),27,4)),
    substr (get_json_object(json_response, '$.created_at'),27,4),
    case substr (get_json_object(json_response, '$.created_at'),5,3)
        when "Jan" then "01"
        when "Feb" then "02"
        when "Mar" then "03"
        when "Apr" then "04"
        when "May" then "05"
        when "Jun" then "06"
        when "Jul" then "07"
        when "Aug" then "08"
        when "Sep" then "09"
        when "Oct" then "10"
        when "Nov" then "11"
        when "Dec" then "12" end,
    substr (get_json_object(json_response, '$.created_at'),9,2),
    substr (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    cast (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    array(
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    array(
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    cast (get_json_object(json_response, '$.user.followers_count') as INT),
    cast (get_json_object(json_response, '$.user.listed_count') as INT),
    cast (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (length(json_response) > 0);
 
____
Now count the number of records in the table  
tweetsproblemstat1
 
hive>select count(*) from  
tweetsproblemstat1;
 
To know the fields in hive table 
hive> describe table  
tweetsproblemstat1;
 
 
 
 


No comments:

Post a Comment