Spark 1.3.1 using scala and acessing Hive (From hdfs to spark) (From hive to spark : Spark back to Hive)
From hive:
A hive table stored as orc format having nine fields shown below :
GO TO $HIVE_HOME/bin>hivehive>describe dimcurentchargeprice_orc
Ch_mas_dmkey INT (unique key),
Ch_code STRING (indicates the purpose of charge),
Ch_code_desc STRING(description of charge),
Price_amt FLOAT(pice to be charged),
Price_desc STRING(assaigned price type),
Start_date STRING(Starting date it effected),
End_date STRING(ending date ),
current_status STRINGH(to indicate current record Boolean variable “y”).
GO to spark-shell
$SPARK_HOME/bin>./bin/spark-shell
Reading orc in scala we need HiveContext
Scala>import org.apache.spark.sql.hive.orc._
Scala>import org.apache.spark.sql._
sc is existing SparkContext() {val sc : SparkContext}
scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Read the orc
scala>val sca_chargepr_orc = hiveContext.orcFile("dimcurentchargeprice_orc")
Register it as a temp table
scala>sca_chargepr_orc.registerTempTable("chargesOrcTable")
scala>val findPrices = hiveContext.sql(“select * from chargesOrcTable where current_status = ‘y’ order by ch_code desc “)
scala>findPrices.count()
scala>findPrices.cache()
The results of the queries are SchemaRDD.To print top 10.
Just to print the results
scala>findPrices .map(t => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).top(10).foreach(println)
To store the above result in a seperate table in hive
scala>hiveContext.sql("create table findprices_orc_temp (
Ch_mas_dmkey INT COMMENT 'unique key',
Ch_code STRING COMMENT 'indicates the purpose of charge',
Ch_code_desc STRING COMMENT 'description of charge',
Price_amt FLOAT COMMENT 'price to be charged',
Price_desc STRING COMMENT 'assaigned price type',
Start_date STRING COMMENT'Starting date it effected',
End_date STRING COMMENT 'ending date',
current_status STRING COMMENT 'to indicate current record Boolean variable y'
) stored as orc")
scala>findPrices.saveAsOrcFile("findprices_orc_temp);
______
FROM HDFS
First reading a comma delimited file from hdfs: in path :/home/src/main/resources/currentchargeprices.csvThe curentchargeprice.csv contains nine fields :
Ch_mas_dmkey : unique key
Ch_code :indicates the purpose of charge
Ch_code_desc :description of charge
Price_amt :Price to be charged
Price_desc :assaigned price type
Start_date :Starting date
End_date :ending date
Current_status :to indicate current record Boolean variable “y”
Using Reflection in Scala:
Go to $SCALA_HOME/bin>./bin/scala-shellscala>import org.apache.log4j.Logger
scala>import org.apache.Log4j.Level
scala>val sqlContext = new org.apache.spark.sql.SqlContext(sc)
scala>import sqlContext.CreateSchemaRDD
Create a case class to define the dimension of the table (case class arguments become names of the table)
scala>case class CurrentCcPrices (chkey: Int, chcode: String, chdesc: String, pamt: Double, pdesc: String, stdate: String, endate: String, currstatus: String)
Based on created case class create a RDD of CurrentCcPrices object
scala>val cprices = sc.textFile(“/home/src/main/resources/currentchargeprices.csv”)
.map(_.split(“,”)).map(p => CurrentCcPrices(p(0).trim.toInt , p(1), p(2), p(3).trim.toInt, p(4).trim.toDouble, p(5), p(6), p(7))
If a csv file/textfile has more colums (example 30 columns) separated by “,” to pick the only the needed columns (a few)from the entire file :
{
scala>val cprices = sc.textFile(“/home/src/main/resources/currentchargeprices.txt”)
scala>val ncols = cprices.map(_.split(“,”)).map(p => Row(p(0).trim.toInt, p(1).trim,p(14).toInt,p(30).trim))
}
Register RDD AS TABLE:
scala>cprices.registerTempTable(“cprices”)
scala>cprices.cache()
Run SQL statements using the sql method provided by SQLContext
scala>val findPrices = sqlContext.sql(“select chkey, chcode, chdesc, pamt, pdesc,
stdate, endate from cprices where currstatus = ‘y’ order by chcode desc “)
scala>findPrices.count()
scala>findPrices.cache()
The results of the queries are SchemaRDD.
scala>findPrices .map(t => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).collect().foreach(println)
To print top 10 results
scala>findPrices .map(t => “charge_code: “ + t(1) + “,”+ “price_amount”: t(3)).top(10).foreach(println)
_________________________________________________