Hi, 

First thanks everyone for their suggestions. Much appreciated. 

This was the original queries written in SQL and run against Spark-shell


val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop") 

val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query")
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query")
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

The second queries were written in FP as much as I could as below 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
times")
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query")
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query")
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

However The first query results are slightly different in SQL and FP
(may be the first query code in FP is not exactly correct?) and more
importantly the FP takes order of magnitude longer compared to SQL (8
minutes compared to less than a minute). I am not surprised as I
expected Functional Programming has to flatten up all those method calls
and convert them to SQL? 

THE STANDARD SQL RESULTS 

Started at
[23/02/2016 23:55:30.30]
res1: org.apache.spark.sql.DataFrame = [result: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at
[23/02/2016 23:56:11.11] 

THE FP RESULTS 

Started at
[23/02/2016 23:45:58.58]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[23/02/2016 23:53:42.42] 

On 22/02/2016 23:16, Mich Talebzadeh wrote: 

> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using 
> SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and 
> dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, 
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 
> http://talebzadehmich.wordpress.com
> 
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Reply via email to