Lateral View,Explode(),Partition By,Rank(),Dense_rank() ,Max(),Min(),Count(),Sum(),Avg() ,Cume_Dist(),Row_Number() In Hive

Problem explanation: Lateral View,Explode(),Partition By,Rank(),Dense_rank() ,Max(),Min(),Count(),Sum(),Avg() ,Cume_Dist(),Row_Number() In Hive

-----------------------
hive> describe sale1;
OK
cust_id                 int                                        
prod_no                 int                                        
qty                     int                                        
datestring              string                                     
sales_id                int                                        
Time taken: 0.106 seconds, Fetched: 5 row(s)
hive> describe cust_temp;
OK
first_name              string                                     
last_name               string                                     
category                string                                     
phone_number            string                                     
customer_id             int                                        
address                 array<string>                              
Time taken: 0.103 seconds, Fetched: 6 row(s)

Explode in hive: (one execution using lateral view /other with sub-query)

hive>select customer_id as cid, address_det from cust_temp
                                                  lateral view explode(address) mytable1 as address_det ;

hive>select t.cid from
(select concat(customer_id," ",address[0]) as cid from cust_temp  
union all
select concat(customer_id," ",address[1]) as cid from cust_temp) t

sample result for above two 
3783    Mediumcity
3783    67890
3784    Bigcity
3784    12345
3785    township
3785    67890
3786    Beck Drive
3786    12345
Time taken: 14.012 seconds, Fetched: 130 row(s)

 hive>select customer_id,address[0],address[1] from cust_temp;
 -------------------
Max,Min,Count,Sum,Avg:
hive> select avg(qty) from sale1;
hive>select count(qty) from sale1;
hive>select max(qty) from sale1;
hive>select min(qty) from sale1;
hive>select max(qty) ,min(qty) , avg(qty),count(qty) from sale1;

hive>select cust_id, max(qty) ,min(qty) , avg(qty),count(qty) from sale1 
group by cust_id;

hive>select cust_id, min(qty),max(qty) , avg(qty),count(qty),cast(datestring as date) from sale1
group  by   cust_id,cast(datestring as date);

hint: individual results based on cust_id, datestring
---------------
Using Partition by  to calculate  Max,Min,Count,Sum,Avg:
Note: The table sale1 is not partitioned in any way , you can check the metadata using describe sale1.So calculating any aggregates depending on huge varying data of specific fields  in the table is possible  by mentioning partition by clause as shown below

hive>select cust_id,  datestring, 
sum(qty) over (partition by cast(datestring as date) order by cust_id) ,
count(sales_id) over (partition by cast(datestring as date) order by cust_id) from sale1;

Hint:The above statement produces a result  based on
a)partition datestring ordered by cust_id in the datestring data .(on a particular date 'n' of customers can buy 'n' of products)
b)in every datestring with same data(when dealing with next customer) , it produces a sum,count,avg for every customer purchase taking the previous customer aggregates and calculating the present it is dealing with.

sample result:
cust_id,  date  ,sum of quantity, count no transaction, avg transactions per customer
830    2012-01-04    1    1    1.0
64    2012-01-07    1    1    1.0
331    2012-01-08    2    1    2.0
322    2012-01-09    6    1    6.0
587    2012-01-09    7    2    3.5
922    2012-01-09    8    3    2.6666666666666665
7    2012-01-24    1    1    1.0
482    2012-01-24    2    2    1.0
773    2012-01-24    6    3    2.0
64    2012-02-12    4    1    4.0
452    2012-02-12    27    2    13.5
97    2012-02-15    1    1    1.0
128    2012-02-15    5    2    2.5
19    2012-02-17    7    1    7.0
647    2012-02-17    9    2    4.5
482    2012-03-08    1    1    1.0
922    2012-03-09    24    1    24.0
47    2012-03-10    1    1    1.0
431    2012-03-10    2    2    1.0
433    2012-04-07    2    1    2.0
922    2012-04-09    3    1    3.0
895    2012-04-15    7    1    7.0
1993    2012-04-15    10    2    5.0
720    2012-05-09    2    1    2.0
3221    2012-05-09    17    2    8.5
24    2012-05-17    5    1    5.0
773    2012-05-17    15    2    7.5
102    2012-06-20    1    1    1.0
720    2012-06-20    2    2    1.0
11    2012-07-24    2    1    2.0
3221    2012-07-24    3    2    1.5
176    2012-08-09    1    1    1.0
227    2012-09-24    1    1    1.0
323    2012-09-24    3    2    1.5
64    2012-10-09    17    1    17.0
452    2012-10-09    19    2    9.5
895    2012-12-09    31    1    31.0
1993    2012-12-09    33    2    16.5
Time taken: 21.323 seconds, Fetched: 38 row(s)


hive> select datestring, 
sum(qty) over (partition by cast(datestring as date)) , 
count(sales_id) over (partition by cast(datestring as date) ) , 
avg(qty) over  (partition by cast(datestring as date)) from sale1;

Hint:If the partition by cast(datestring as date) is only specified , then only the final result will vary because it will partition by only first field date and aggregations are done on the whole partition . Every repetition of same data of datestring it will print one line with same aggregates
Sample result
OK
2012-01-04    1    1    1.0
2012-01-07    1    1    1.0
2012-01-08    2    1    2.0
2012-01-09    8    3    2.6666666666666665
2012-01-09    8    3    2.6666666666666665
2012-01-09    8    3    2.6666666666666665
2012-01-24    6    3    2.0


---------------------------------------

Rank & Dense Rank 

hive>select a.*,
 rank() over (ORDER BY a.cust_id ASC) as rankcust from sale1 a
ORDER BY a.cust_id;

 (OR/below both gives same result) 



hive>select a.*,
 rank() over (ORDER BY a.cust_id ASC) as rankcust from sale1 a;

hive>select a.*,
dense_rank() over (ORDER BY a.cust_id ASC) as rankcust from sale1 a
ORDER BY a.cust_id; 
         
 (OR/below both gives same result)

hive>select a.*,
dense_rank() over (ORDER BY a.cust_id ASC) as rankcust from sale1 a



Hint:Dense Rank will rank the cust_id in ascending order in sequence without any missing integer

hive>select a.cust_id,  count(*),
dense_rank() over (ORDER BY a.cust_id ASC) as rankcust from sales a
group BY a.cust_id;

hive>select a.cust_id,  a.datestring,
percent_rank() over (partition by cast(a.datestring as date) order by a.cust_id) as rankcust1,
dense_rank() over (partition by cast(a.datestring as date) order by a.cust_id ) as rankcust2,
rank() over (partition by cast(a.datestring as date) order by a.cust_id ) as rankcust0
  from sale1 a ;

The below result is ranked, dense_rank ,percent_rank by
step 1: partition by date
step2: in each partition by date /order it by cust_id
Step3:In each partition after step2 rank them accordingly

sample Output:
cust_id ,datestring, percent_rank, dense_rank, rank
331    2012-01-08    0.0    1    1
322    2012-01-09    0.0    1    1
322    2012-01-09    0.0    1    1
587    2012-01-09    0.5    2    3
587    2012-01-09    0.5    2    3
922    2012-01-09    1.0    3    5
7    2012-01-24    0.0    1    1
7    2012-01-24    0.0    1    1
11    2012-01-24    0.3333333333333333    2    3
482    2012-01-24    0.5    3    4
482    2012-01-24    0.5    3    4
773    2012-01-24    0.8333333333333334    4    6
3221    2012-01-24    1.0    5    7
64    2012-02-12    0.0    1    1
452    2012-02-12    1.0    2    2


hive>select a.cust_id,  a.datestring,

dense_rank() over ( order by a.cust_id ) as rankcust2,
rank() over (order by a.cust_id ) as rankcust0
  from sale1 a ;

The below result is
step1:order by cust_id
Step2:then rank by accordingly 
sample output:
7    2012-01-24    1    1
7    2012-01-24    1    1
11    2012-01-24    2    3
11    2012-07-24    2    3
19    2012-06-20    3    5
19    2012-02-17    3    5
24    2012-05-17    4    7
24    2012-12-09    4    7
47    2012-03-10    5    9


__________________

Cume_Dist()
hive>select cust_id,
cume_dist() over (order by qty ) from sale1;

hive>select cust_id,
cume_dist() over (partition by cast(datestring as date) order by cust_id) from sale1 ;
____________________
Row_Number() 

hive>select cust_id,  datestring,
row_number() over (partition by cast(datestring as date) ) from sale1;
Result: explanation: Step1: partition by datestring
step2: in each datestring partition row numbers are assigned.
step3:for every partition the row number starts from one the beginning
OK
830    2012-01-04    1
64    2012-01-07    1
331    2012-01-08    1
587    2012-01-09    1
922    2012-01-09    2
322    2012-01-09    3
322    2012-01-09    4
587    2012-01-09    5
773    2012-01-24    1
482    2012-01-24    2
7    2012-01-24    3
7    2012-01-24    4
3221    2012-01-24    5
11    2012-01-24    6
482    2012-01-24    7
produces 51 rows.


hive>select cust_id,  datestring,
row_number() over (order by cust_id ) from sale1;

Explanation:
 step1:Row_number() is given by order of cust_id
step2:Along with it cust_id,datestring  is displayed
sample output:
7    2012-01-24    1
7    2012-01-24    2
11    2012-01-24    3
11    2012-07-24    4
19    2012-06-20    5
19    2012-02-17    6
24    2012-05-17    7
24    2012-12-09    8
47    2012-03-10    9
64    2012-10-09    10
64    2012-02-12    11
64    2012-01-07    12
97    2012-02-15    13


hive>select cust_id,  datestring,
row_number() over (partition by cast(datestring as date) ) from sale1 
group by cust_id,  datestring;

Hint:produces 44 rows
sample result:Step1: partition by datestring
Step2: because it grouped by cust_id and datestring ( the duplicates in them are removed if both fields are same in the rows)
Step3:row_number is given to each row in each partition  (that is the reason we are seeing the number starting from one again)
OK
830    2012-01-04    1
64    2012-01-07    1
331    2012-01-08    1
322    2012-01-09    1
922    2012-01-09    2
587    2012-01-09    3
773    2012-01-24    1
11    2012-01-24    2
3221    2012-01-24    3
7    2012-01-24    4
482    2012-01-24    5
452    2012-02-12    1
64    2012-02-12    2
128    2012-02-15    1
97    2012-02-15    2
647    2012-02-17    1
19    2012-02-17    2
482    2012-03-08    1
922    2012-03-09    1
47    2012-03-10  
431    2012-03-10    2
433    2012-04-07    1
922    2012-04-09    1
895    2012-04-15    1
1993    2012-04-15    2
720    2012-05-09    1
3221    2012-05-09    2
24    2012-05-17    1
773    2012-05-17    2
720    2012-06-20    1
452    2012-06-20    2
102    2012-06-20    3
19    2012-06-20    4
3221    2012-07-24    1
11    2012-07-24    2
176    2012-08-09    1
323    2012-09-24    1
227    2012-09-24    2
64    2012-10-09    1
452    2012-10-09    2
1993    2012-12-09    1
24    2012-12-09    2
895    2012-12-09    3
482    2012-12-09    4
Time taken: 47.869 seconds, Fetched: 44 row(s)


hive>select cust_id,  datestring,
row_number() over (order by cast(datestring as date)) from sale1
 group by cust_id,  datestring;

 Result:Explanation:
Step1: order by datestring 
Step2: because it grouped by cust_id and datestring ( the duplicates in them are removed if both fields are same in the rows)
Step3:row_number is given to each row.
OK
830    2012-01-04    1
64    2012-01-07    2
331    2012-01-08    3
322    2012-01-09    4
922    2012-01-09    5
587    2012-01-09    6
773    2012-01-24    7
11    2012-01-24    8
3221    2012-01-24    9
7    2012-01-24    10
482    2012-01-24    11
452    2012-02-12    12
64    2012-02-12    13
128    2012-02-15    14
97    2012-02-15    15
647    2012-02-17    16
19    2012-02-17    17
482    2012-03-08    18
922    2012-03-09    19
47    2012-03-10    20
431    2012-03-10    21
433    2012-04-07    22
922    2012-04-09    23
895    2012-04-15    24
1993    2012-04-15    25
720    2012-05-09    26
3221    2012-05-09    27
24    2012-05-17    28
773    2012-05-17    29
720    2012-06-20    30
452    2012-06-20    31
102    2012-06-20    32
19    2012-06-20    33
3221    2012-07-24    34
11    2012-07-24    35
176    2012-08-09    36
323    2012-09-24    37
227    2012-09-24    38
64    2012-10-09    39
452    2012-10-09    40
1993    2012-12-09    41
24    2012-12-09    42
895    2012-12-09    43
482    2012-12-09    44

Time taken: 47.808 seconds, Fetched: 44 row(s)

Hive Script Execution/hive -e Option/Passing Varaibles To Hive Script

Problem Explanation: Hive Script Execution/hive -e Option/Passing Varaibles To Hive Script

 -----------------------
 A 'n' of hive commands in batch  can be stored into a hive script file with a file name with  .hive, .sql, .q extensions.

Create a file named  myscript.sql
----start of myscript.sql
select cast(t3.stdt as date) from 
(select concat(st_dt[2],'-',st_dt[0],'-',st_dt[1]) stdt from customersales.emp_det)t3
---end of myscript.sql

Execute myscript.sql

$HIVE_HOME/bin>hive -S -f myscript1.sql(with silent option)
-------
myscript1.q contents in a file

select cast(t3.stdt as date) from (select concat(st_dt[2],'-',st_dt[0],'-',st_dt[1]) stdt 
from 
${hiveconf : my_var1})t3

select concat(st_dt[2],'-',st_dt[0],'-',st_dt[1]) from  ${hiveconf : my_var1} limit 1;

----end of file myscript1.q

Execute myscript1.q

$HIVE_HOME/bin> hive  -S -hiveconf  my_var1=customersales.emp_det  -f  /home/hadoop/Desktop/tempscript1.q

---------
Execute with -e option

$HIVE_HOME/bin>hive  -S -e  "select cast(t3.stdt as date) from (select concat(st_dt[2],'-',st_dt[0],'-',st_dt[1]) stdt from customersales.emp_det)t3 ;"


Loading a Flat File & Converting a Date Variable In a Flat File Into Hive Date Format Field In a Table.

 Loading a Flat File & Converting  a Date Variable In a Flat File Into Hive Date Format Field In a Table.(Hands on explanation)

--------------------------
Step 1:Create a Stage table

hive>create table emp_stage
(fn string , ln string,cur_wk string, prv_wk string, stdt Array<string>,enddt Array<string>,eid bigint)
 row format delimited
 fields terminated by ','
collection items terminated by '/'
 stored as textfile;

Note: Sample of flat file data
First_ Name , last_name, Currently_ working, Previously_working,Start_date,End_date,Employee_id
John,Wade,Washigton , ,1/8/2014,,2938

Franklin,Josephs,Detriot,washington ,11/12/2013,2/12/2015,2913

Elizabeth,Reynolds,Cleveland, ,4/30/2011,,2891

Arthur,Gibbons,fortwayne, ,6/28/2015, ,400

Franklin,Josephs,washington, ,2/12/2015, ,2913

But the date format of hive is YYYY-MM-DD

Step 2:
hive>Load data inpath
 '/user/hadoop/Emp_details.csv'
 into table emp_stage;

Step 3: Insert overwrite directory
'/user/hadoop/date_temp'
select fn , ln ,cur_wk ,  prv_wk,
 concat(stdt[2],'-',stdt[0],'-',stdt[1]),
 concat(enddt[2],'-',enddt[0],'-',enddt[1]),
 eid from emp_stage;

step4:create table emp_det
(fn string , ln string,cur_wk string, prv_wk string, stdt date,enddt date,eid bigint)
row format delimited
 fields terminated by '\001'
collection items terminated by '-'
stored as textfile;

step5:load data inpath
 '/user/hadoop/date_temp'
 into table emp_det;

step 6:To display in hive date format in hive cli 


select cast(t3.st_dt as date), cast(t3.end_dt as date)  from (select concat(stdt[2],'-',stdt[0],'-',stdt[1]) st_dt , concat(enddt[2],'-',enddt[0],'-',enddt[1]) end_dt from emp_stage)t3;

Hive Insert For New Records From Staging Table Into a Existing Hive Table.(For Scdtype-2)

 Hive Insert For New Records From Staging Table Into a Existing Hive Table.(For Scdtype-2)(hands on explanation)

-----------------------------------------------
Step 1:
In scd-type 2 implementation using date range table , to insert a new record from a  end of the day flat file ,into a existing hive table without overwrite . Load  the entire end of day flat file into stage table and then to existing hive table with comparisions between them.

hive>
create table employee_stage
(fn string , ln string, current_workingcity string, previously_workedcity string, start_date Array<string>,end_date  Array<string>, employee_id bigint)
 row format delimited
 fields terminated by ','
collection items terminated by '/'
 stored as textfile;

hive>load data inpath
'/user/hadoop/employee_day12march2014'
 into table employee_stage;

The hive supports only equi joins .If a table_stage table has 100 new records out of 1000 records that needs to be inserted into existing hive table "employee_history" (which is scdtype-2 with date range) without any errors execute

hive> insert overwrite  directory
'/user/hadoop/newinsert=day1march2014'
select c.employee_id from employee_stage c where c.employee_id
 NOT IN
(select e.employee_id from employee_history e);
Note: so you have a flat file with records of new employee_id's  that been inserted into employee_history

hive>
insert into table employee_history
 select c.* from employee_stage c where c.employee_id 
NOT IN
 (select e.employee_id from employee_history e); 
hint: make sure the employee_id column in employee_stage table in not null.

Ctas With Three Table Joins In Two Different Databases In a Subquery

 Ctas With Three Table Joins In Two Different Databases In a Subquery(hands on explanation)

--------
hive> describe default.pd_det;
OK
dept                    string                                    
cost                    int                                       
id                      int                                       
asmb_city               string                                    
asmb_ct                 string                                    
retail                  int                                       
country                 string                                    
         
# Partition Information         
# col_name                data_type               comment           
         
country                 string 

------
hive> describe default.item_avgprofit_byctry ;
OK
avgprf                  int                                        
makein                  string                                     
itemshipped             int                                        
Time taken: 2.596 seconds, Fetched: 3 row(s)
----
hive> describe customersales.product_part ;
OK
prod_name               string                                     
description             varchar(20)                                
qty_onhand              int                                        
prod_num                bigint                                     
packaged_with           array<string>                              
category                string                                     
        
# Partition Information        
# col_name                data_type               comment            
        
category                string                                     
Time taken: 0.185 seconds, Fetched: 11 row(s)
-----
Step 2:  A join between three tables
table 1:
default.pd_det a (join column :a.country)
table 2:
default.item_avgprofit_byctry (join cloumn:b.makein)
table3:
  customersales.product_part c (join column: c.prod_num with a.id)
______________________
Step 3: CTAS

create table threejoin_ctastwodifferentdb stored as textfile as

select t3.cost,t3.pdid,t3.madein,t3.pdname from
 ( 
select a.cost as cost, a.id as pdid, b.makein as madein, c.prod_name as pdname from default.pd_det a
join default.item_avgprofit_byctry b on (a.country = b.makein)
 join  customersales.product_part c on (a.id = c.prod_num)
 )t3

------
hive> describe threejoin_ctastwodifferentdb;
OK
cost                    int                                        
pdid                    int                                        
madein                  string                                     
pdname                  string                                     
Time taken: 1.17 seconds, Fetched: 4 row(s)

CTAS has these restrictions:(which we created in the above example " threejoin_ctastwodifferentdb"cannot be

    The target table cannot be a partitioned table.
    The target table cannot be an external table.
    The target table cannot be a list bucketing table.

Create a External Hive Partitioned Table.

 Create a External Hive Partitioned Table.(hands on explanation)

------------------------
Step1:Create a External table .The partition files would be the sub directories of  location in hdfs under /user/hadoop/country/ . Under directory country the sub directories are partitions in the view of table definition as shown below

hive>create EXTERNAL table  pd_det
(dept string,  cost int, id int,asmb_city string,asmb_ct string, retail int)
PARTITIONED BY (country string)
 row format delimited
 fields terminated by '\001'
 stored as textfile
location '/user/hadoop/country/';

Step 2:Whenever a new directory  is added to hdfs in path /user/hadoop/country/ , the alter table with add partition command must be executed .
a)For example if a new directory with name format  'country=usa' is added to hdfs://localhost:8020/user/hadoop/country path ,
 then it means you are adding a  new partition to table pd_det with partition country value as usa.
The data file present in the directory of  hdfs://localhost:8020/user/hadoop/country/country=usa will be read into table.
b) The table pd_det to recognize the partition so must execute  a alter table command as shown below.

hive>Alter table pd_det add partition(country ='usa');

Step 3: To create a data file inside a directory from table present in hive execute

hive>insert overwrite  directory
 '/user/hadoop/country/country=india'
select
 dept,cost, id ,asmb_city,asmb_ct,retail
 from pd_temp
where make = 'india';

step 4: If the  folder name is different than above specified then specify the location with the alter statement

 hive>alter table pd_det
add if not exists partition(country='mexico')
location '/user/hadoop/country/mexico' ;

Pig Running in local mode and accessing file from hdfs/ storing in Hdfs

 Pig Running in local mode and accessing file from hdfs/ storing in Hdfs(problem explanation)

----------------------
Step 1:Running  Pig -x local mode  by defining a absolute path with a scheme and a authority  hadoop can still read distributed file system files  and results can be stored in hdfs.

salrec = load 'hdfs://localhost:8020/user/hadoop/Sales.csv' using PigStorage(',')
 AS
(custid:long, prod_id:long, qty_pur:int, pur_date:datetime, sale_id:long);

custrec = load 'hdfs://localhost:8020/user/hadoop/Customer.csv' using PigStorage(',')
AS
(fn:chararray, ln:chararray, status:chararray, ph:chararray, custid1:long, add:chararray);

pd_rec = load  'hdfs://localhost:8020/user/hadoop/Product.csv' using PigStorage(',')
AS (pdname:chararray, pddesc:chararray, pdcat:chararray, pdgty:long, pdid:long, pkwith:chararray);

crs_rec = join salrec by prod_id , pd_rec by pdid;

crs_rec1 = group crs_rec by custid,prod_id;

x = foreach crs_rec1 generate  group, flatten(crs_rec.(custid,sale_id,pdname,qty_pur,fn));

store x into 'hdfs://localhost:8020/user/hadoop/piglocaltohdfs2' USING PigStorage(',');

sample output in hdfs :
(7,98243),7,34842,09K Video,1,Allen
(11,77623),11,34843,J Case 1500,2,John
(19,88734),19,34857,DVD J INT,7,Hubert
(24,45641),24,34856,500 GB HD T,5,Roger