Hi,
First thanks everyone for their suggestions. Much appreciated. This was the original queries written in SQL and run against Spark-shell val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql("use oraclehadoop") val rs = HiveContext.sql( """ SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales FROM smallsales s INNER JOIN times t ON s.time_id = t.time_id INNER JOIN channels c ON s.channel_id = c.channel_id GROUP BY t.calendar_month_desc, c.channel_desc """) rs.registerTempTable("tmp") println ("nfirst query") HiveContext.sql(""" SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales from tmp ORDER BY MONTH, CHANNEL LIMIT 5 """).collect.foreach(println) println ("nsecond query") HiveContext.sql(""" SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES FROM tmp GROUP BY channel_desc order by SALES DESC LIMIT 5 """).collect.foreach(println) println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit The second queries were written in FP as much as I could as below val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql("use oraclehadoop") var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales") val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels") val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times") val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) println ("nfirst query") val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) println ("nsecond query") val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit However The first query results are slightly different in SQL and FP (may be the first query code in FP is not exactly correct?) and more importantly the FP takes order of magnitude longer compared to SQL (8 minutes compared to less than a minute). I am not surprised as I expected Functional Programming has to flatten up all those method calls and convert them to SQL? THE STANDARD SQL RESULTS Started at [23/02/2016 23:55:30.30] res1: org.apache.spark.sql.DataFrame = [result: string] rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] first query [1998-01,Direct Sales,9161730] [1998-01,Internet,1248581] [1998-01,Partners,2409776] [1998-02,Direct Sales,9161840] [1998-02,Internet,1533193] second query [Direct Sales,9161840] [Internet,3977374] [Partners,3976291] [Tele Sales,328760] Finished at [23/02/2016 23:56:11.11] THE FP RESULTS Started at [23/02/2016 23:45:58.58] res1: org.apache.spark.sql.DataFrame = [result: string] s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint] c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string] t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] first query [1998-01,Direct Sales,9086830] [1998-01,Internet,1247641] [1998-01,Partners,2393567] [1998-02,Direct Sales,9161840] [1998-02,Internet,1533193] rs1: Unit = () second query [Direct Sales,9161840] [Internet,3977374] [Partners,3976291] [Tele Sales,328760] rs2: Unit = () Finished at [23/02/2016 23:53:42.42] On 22/02/2016 23:16, Mich Talebzadeh wrote: > Hi, > > I have data stored in Hive tables that I want to do simple manipulation. > > Currently in Spark I perform the following with getting the result set using > SQL from Hive tables, registering as a temporary table in Spark > > Now Ideally I can get the result set into a DF and work on DF to slice and > dice the data using functional programming with filter, map. split etc. > > I wanted to get some ideas on how to go about it. > > thanks > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > > HiveContext.sql("use oraclehadoop") > val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, > SUM(s.amount_sold) AS TotalSales > FROM smallsales s, times t, channels c > WHERE s.time_id = t.time_id > AND s.channel_id = c.channel_id > GROUP BY t.calendar_month_desc, c.channel_desc > """) > RS.REGISTERTEMPTABLE("TMP") > > HiveContext.sql(""" > SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales > from tmp > ORDER BY MONTH, CHANNEL > """).collect.foreach(println) > HiveContext.sql(""" > SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES > FROM tmp > GROUP BY channel_desc > order by SALES DESC > """).collect.foreach(println) > > -- > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > http://talebzadehmich.wordpress.com > > NOTE: The information in this email is proprietary and confidential. This > message is for the designated recipient only, if you are not the intended > recipient, you should destroy it immediately. Any information in this message > shall not be understood as given or endorsed by Cloud Technology Partners > Ltd, its subsidiaries or their employees, unless expressly so stated. It is > the responsibility of the recipient to ensure that this email is virus free, > therefore neither Cloud Technology partners Ltd, its subsidiaries nor their > employees accept any responsibility. -- Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com NOTE: The information in this email is proprietary and confidential. This message is for the designated recipient only, if you are not the intended recipient, you should destroy it immediately. Any information in this message shall not be understood as given or endorsed by Cloud Technology Partners Ltd, its subsidiaries or their employees, unless expressly so stated. It is the responsibility of the recipient to ensure that this email is virus free, therefore neither Cloud Technology partners Ltd, its subsidiaries nor their employees accept any responsibility.