Spark 1.3.1 using scala and acessing Hive (From hdfs to spark) (From hive to spark : Spark back to Hive)

Spark 1.3.1 using scala and acessing Hive (From hdfs  to spark) (From hive  to spark :  Spark back to Hive)

From hive: 

 A hive table  stored as orc format having nine fields shown below :

GO TO $HIVE_HOME/bin>hive

hive>describe dimcurentchargeprice_orc

Ch_mas_dmkey        INT (unique key),
Ch_code                  STRING (indicates the purpose of charge),
Ch_code_desc         STRING(description of charge),
Price_amt                 FLOAT(pice to be charged),
Price_desc               STRING(assaigned price type),
Start_date                STRING(Starting date it effected),
End_date                 STRING(ending date ),
current_status           STRINGH(to indicate current record Boolean variable “y”).
GO to spark-shell

$SPARK_HOME/bin>./bin/spark-shell

Reading orc in scala we need HiveContext

Scala>import org.apache.spark.sql.hive.orc._
Scala>import org.apache.spark.sql._

sc is existing SparkContext() {val sc : SparkContext}


scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Read the orc

scala>val sca_chargepr_orc = hiveContext.orcFile("dimcurentchargeprice_orc")

Register it as a temp table 

scala>sca_chargepr_orc.registerTempTable("chargesOrcTable")

scala>val  findPrices = hiveContext.sql(“select * from chargesOrcTable where current_status = ‘y’ order by  ch_code desc “)

scala>findPrices.count()

scala>findPrices.cache()

The results of the queries are SchemaRDD.To print top 10.

Just to print the results
 scala>findPrices .map(t  => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).top(10).foreach(println)



To store the above result in a seperate table in hive 


scala>hiveContext.sql("create table findprices_orc_temp (

Ch_mas_dmkey    INT COMMENT 'unique key',
Ch_code    STRING COMMENT 'indicates the purpose of charge',
Ch_code_desc STRING COMMENT 'description of charge',
Price_amt FLOAT COMMENT 'price to be charged',
Price_desc STRING COMMENT 'assaigned price type',
Start_date STRING COMMENT'Starting date it effected',
End_date STRING COMMENT 'ending date',
current_status STRING COMMENT 'to indicate current record Boolean variable y'

) stored as orc")


scala>findPrices.saveAsOrcFile("findprices_orc_temp);


______

FROM HDFS

First reading a comma delimited file from hdfs: in path :/home/src/main/resources/currentchargeprices.csv

The curentchargeprice.csv contains nine fields :

Ch_mas_dmkey       : unique key
Ch_code        :indicates the purpose of charge
Ch_code_desc        :description of charge
Price_amt        :Price to be charged
Price_desc        :assaigned price type
Start_date         :Starting date
End_date        :ending date
Current_status        :to indicate current record Boolean variable “y”

Using Reflection in Scala:

Go to $SCALA_HOME/bin>./bin/scala-shell

scala>import org.apache.log4j.Logger
scala>import org.apache.Log4j.Level
scala>val sqlContext  =  new org.apache.spark.sql.SqlContext(sc)
scala>import sqlContext.CreateSchemaRDD

Create a case class to define the dimension of the table (case class arguments become names of the table)

scala>case class CurrentCcPrices (chkey:  Int,  chcode:  String,  chdesc:  String,  pamt:  Double,  pdesc: String, stdate: String, endate: String, currstatus: String)

Based on created case class create a RDD of  CurrentCcPrices object
scala>val  cprices =  sc.textFile(“/home/src/main/resources/currentchargeprices.csv”)
.map(_.split(“,”)).map(p => CurrentCcPrices(p(0).trim.toInt , p(1),  p(2),  p(3).trim.toInt,  p(4).trim.toDouble,  p(5), p(6), p(7))

If a csv file/textfile has more colums (example 30 columns) separated by “,” to pick the only the needed columns (a few)from the entire file :
{

scala>val  cprices =  sc.textFile(“/home/src/main/resources/currentchargeprices.txt”)

scala>val ncols =  cprices.map(_.split(“,”)).map(p => Row(p(0).trim.toInt, p(1).trim,p(14).toInt,p(30).trim))
}

Register RDD AS TABLE:
scala>cprices.registerTempTable(“cprices”)

scala>cprices.cache()

Run SQL statements using the sql method provided by SQLContext
scala>val  findPrices = sqlContext.sql(“select chkey,  chcode,  chdesc,  pamt,  pdesc,
stdate,   endate from cprices where currstatus = ‘y’ order by  chcode desc “)

scala>findPrices.count()

scala>findPrices.cache()

The results of the queries are SchemaRDD.
scala>findPrices .map(t  => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).collect().foreach(println)




To print top 10 results
scala>findPrices .map(t  => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).top(10).foreach(println)

_________________________________________________