RE: Shuffle intermidiate results not being cached

2016-12-28 Thread Liang-Chi Hsieh
ion it with new >> dataframe to compute the newer aggregation summary in next iteration. It >> is more similar to streaming case, I don't think you can/should recompute >> all the data since the beginning of a stream. >> >> assaf.mendelson wrote >> The reason I thou

RE: Shuffle intermidiate results not being cached

2016-12-28 Thread Liang-Chi Hsieh
performance wise) was to write a custom > UDAF which does the window internally. This was still 8 times lower > throughput than batch and required a lot of coding and is not a general > solution. > > I am looking for an approach to improve the performance even more > (preferably to eith

RE: Shuffle intermidiate results not being cached

2016-12-27 Thread assaf.mendelson
the joining behind the scenes. The problem is that any attempt to do a streaming like this results in performance which is hundreds of times slower than batch. Is there a correct way to do such an aggregation on streaming data (using dataframes rather than RDD operations). Assaf

RE: Shuffle intermidiate results not being cached

2016-12-27 Thread Liang-Chi Hsieh
slower than batch. > Is there a correct way to do such an aggregation on streaming data (using > dataframes rather than RDD operations). > Assaf. > > > > From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto: > ml-node+s1001551n20361h80@.nabble > ] >

RE: Shuffle intermidiate results not being cached

2016-12-26 Thread assaf.mendelson
[via Apache Spark Developers List] [mailto:ml-node+s1001551n20361...@n3.nabble.com] Sent: Monday, December 26, 2016 5:42 PM To: Mendelson, Assaf Subject: Re: Shuffle intermidiate results not being cached Hi, Let me quote your example codes: var totalTime: Long = 0 var allDF

Re: Shuffle intermidiate results not being cached

2016-12-26 Thread Liang-Chi Hsieh
Hi, Let me quote your example codes: var totalTime: Long = 0 var allDF: org.apache.spark.sql.DataFrame = null for { x <- dataframes } { val timeLen = time { allDF = if (allDF == null) x else allDF.union(x) val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))

Re: Shuffle intermidiate results not being cached

2016-12-26 Thread Mark Hamstra
Shuffle results are only reused if you are reusing the exact same RDD. If you are working with Dataframes that you have not explicitly cached, then they are going to be producing new RDDs within their physical plan creation and evaluation, so you won't get implicit shuffle reuse. This is what htt