Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
Thanks Reynold. Not sure why doExecute is not invoked, since CollectLimit does not support wholeStage case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { I will dig further into this. Zhan Zhang On Apr 18, 2016, at 10:36 PM, Reynold Xin mailto:r...@databricks.com>> wrot

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Reynold Xin
Anyway we can verify this easily. I just added a println to each row and verified that only limit + 1 row was printed after the join and before the limit. It'd be great if you do some debugging yourself and see if it is going through some other code path. On Mon, Apr 18, 2016 at 10:35 PM, Reynol

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Reynold Xin
But doExecute is not called? On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang wrote: > Hi Reynold, > > I just check the code for CollectLimit, there is a shuffle happening to > collect them in one partition. > > protected override def doExecute(): RDD[InternalRow] = { > val shuffled = new Shuffled

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
Hi Reynold, I just check the code for CollectLimit, there is a shuffle happening to collect them in one partition. protected override def doExecute(): RDD[InternalRow] = { val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( child.execute(), child.output, Sin

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Reynold Xin
Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like /* 094 */ protected void processNext() throws java.io.IOException { /* 095 */ /*** PRODUCE: Project [id#7

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Zhan Zhang
>From the physical plan, the limit is one level up than the WholeStageCodegen, >Thus, I don’t think shouldStop would work here. To move it work, the limit has >to be part of the wholeStageCodeGen. Correct me if I am wrong. Thanks. Zhan Zhang On Apr 18, 2016, at 11:09 AM, Reynold Xin mailto:r

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Reynold Xin
I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop). It is not the most optimal solution, because a sing

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Andrew Ray
While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting. On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier < hvanhov...@questtec.nl> wrote: > I am

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-12 Thread Herman van Hövell tot Westerflier
I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit. This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the r