instead of: var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales") you should be able to do something like: val s = HiveContext.table("sales").select("AMOUNT_SOLD", "TIME_ID", "CHANNEL_ID")
its not obvious to me why the dataframe (aka FP) version would be significantly slower, they should translate into similar/same execution plans. the explain method on DataFrame should show you the plans. On Tue, Feb 23, 2016 at 7:09 PM, Mich Talebzadeh < mich.talebza...@cloudtechnologypartners.co.uk> wrote: > > > Hi, > > First thanks everyone for their suggestions. Much appreciated. > > This was the original queries written in SQL and run against Spark-shell > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql("use oraclehadoop") > > val rs = HiveContext.sql( > """ > SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS > TotalSales > FROM smallsales s > INNER JOIN times t > ON s.time_id = t.time_id > INNER JOIN channels c > ON s.channel_id = c.channel_id > GROUP BY t.calendar_month_desc, c.channel_desc > """) > rs.registerTempTable("tmp") > println ("\nfirst query") > HiveContext.sql(""" > SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales > from tmp > ORDER BY MONTH, CHANNEL LIMIT 5 > """).collect.foreach(println) > println ("\nsecond query") > HiveContext.sql(""" > SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES > FROM tmp > GROUP BY channel_desc > order by SALES DESC LIMIT 5 > """).collect.foreach(println) > println ("\nFinished at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > sys.exit > > The second queries were written in FP as much as I could as below > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql("use oraclehadoop") > var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM > sales") > val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels") > val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times") > val rs = > s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) > println ("\nfirst query") > val rs1 = > rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) > println ("\nsecond query") > val rs2 > =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) > println ("\nFinished at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > sys.exit > > > > However The first query results are slightly different in SQL and FP (may > be the first query code in FP is not exactly correct?) and more importantly > the FP takes order of magnitude longer compared to SQL (8 minutes compared > to less than a minute). I am not surprised as I expected Functional > Programming has to flatten up all those method calls and convert them to > SQL? > > *The standard SQL results* > > > > Started at > [23/02/2016 23:55:30.30] > res1: org.apache.spark.sql.DataFrame = [result: string] > rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, > channel_desc: string, TotalSales: decimal(20,0)] > > first query > [1998-01,Direct Sales,9161730] > [1998-01,Internet,1248581] > [1998-01,Partners,2409776] > [1998-02,Direct Sales,9161840] > [1998-02,Internet,1533193] > > > > second query > [Direct Sales,9161840] > [Internet,3977374] > [Partners,3976291] > [Tele Sales,328760] > > Finished at > [23/02/2016 23:56:11.11] > > *The FP results* > > Started at > [23/02/2016 23:45:58.58] > res1: org.apache.spark.sql.DataFrame = [result: string] > s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: > timestamp, CHANNEL_ID: bigint] > c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: > string] > t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, > CALENDAR_MONTH_DESC: string] > rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, > channel_desc: string, TotalSales: decimal(20,0)] > > first query > [1998-01,Direct Sales,9086830] > [1998-01,Internet,1247641] > [1998-01,Partners,2393567] > [1998-02,Direct Sales,9161840] > [1998-02,Internet,1533193] > rs1: Unit = () > > second query > [Direct Sales,9161840] > [Internet,3977374] > [Partners,3976291] > [Tele Sales,328760] > rs2: Unit = () > > Finished at > [23/02/2016 23:53:42.42] > > > > On 22/02/2016 23:16, Mich Talebzadeh wrote: > > Hi, > > I have data stored in Hive tables that I want to do simple manipulation. > > Currently in Spark I perform the following with getting the result set > using SQL from Hive tables, registering as a temporary table in Spark > > Now Ideally I can get the result set into a DF and work on DF to slice and > dice the data using functional programming with filter, map. split etc. > > I wanted to get some ideas on how to go about it. > > thanks > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > > HiveContext.sql("use oraclehadoop") > val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, > SUM(s.amount_sold) AS TotalSales > FROM smallsales s, times t, channels c > WHERE s.time_id = t.time_id > AND s.channel_id = c.channel_id > GROUP BY t.calendar_month_desc, c.channel_desc > """) > *rs.registerTempTable("tmp")* > > > HiveContext.sql(""" > SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales > from tmp > ORDER BY MONTH, CHANNEL > """).collect.foreach(println) > HiveContext.sql(""" > SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES > FROM tmp > GROUP BY channel_desc > order by SALES DESC > """).collect.foreach(println) > > > -- > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > http://talebzadehmich.wordpress.com > > NOTE: The information in this email is proprietary and confidential. This > message is for the designated recipient only, if you are not the intended > recipient, you should destroy it immediately. Any information in this message > shall not be understood as given or endorsed by Cloud Technology Partners > Ltd, its subsidiaries or their employees, unless expressly so stated. It is > the responsibility of the recipient to ensure that this email is virus free, > therefore neither Cloud Technology partners Ltd, its subsidiaries nor their > employees accept any responsibility. > > > > > > -- > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > http://talebzadehmich.wordpress.com > > NOTE: The information in this email is proprietary and confidential. This > message is for the designated recipient only, if you are not the intended > recipient, you should destroy it immediately. Any information in this message > shall not be understood as given or endorsed by Cloud Technology Partners > Ltd, its subsidiaries or their employees, unless expressly so stated. It is > the responsibility of the recipient to ensure that this email is virus free, > therefore neither Cloud Technology partners Ltd, its subsidiaries nor their > employees accept any responsibility. > > >