Thanks Reynold.
Not sure why doExecute is not invoked, since CollectLimit does not support
wholeStage
case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
I will dig further into this.
Zhan Zhang
On Apr 18, 2016, at 10:36 PM, Reynold Xin
mailto:r...@databricks.com>> wrot
Anyway we can verify this easily. I just added a println to each row and
verified that only limit + 1 row was printed after the join and before the
limit.
It'd be great if you do some debugging yourself and see if it is going
through some other code path.
On Mon, Apr 18, 2016 at 10:35 PM, Reynol
But doExecute is not called?
On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang wrote:
> Hi Reynold,
>
> I just check the code for CollectLimit, there is a shuffle happening to
> collect them in one partition.
>
> protected override def doExecute(): RDD[InternalRow] = {
> val shuffled = new Shuffled
Hi Reynold,
I just check the code for CollectLimit, there is a shuffle happening to collect
them in one partition.
protected override def doExecute(): RDD[InternalRow] = {
val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
child.execute(), child.output, Sin
Unless I'm really missing something I don't think so. As I said, it goes
through an iterator and after processing each stream side we do a
shouldStop check. The generated code looks like
/* 094 */ protected void processNext() throws java.io.IOException {
/* 095 */ /*** PRODUCE: Project [id#7
>From the physical plan, the limit is one level up than the WholeStageCodegen,
>Thus, I don’t think shouldStop would work here. To move it work, the limit has
>to be part of the wholeStageCodeGen.
Correct me if I am wrong.
Thanks.
Zhan Zhang
On Apr 18, 2016, at 11:09 AM, Reynold Xin
mailto:r
I could be wrong but I think we currently do that through whole stage
codegen. After processing every row on the stream side, the generated code
for broadcast join checks whether it has hit the limit or not (through this
thing called shouldStop).
It is not the most optimal solution, because a sing
While you can't automatically push the limit *through* the join, we could
push it *into* the join (stop processing after generating 10 records). I
believe that is what Rajesh is suggesting.
On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
hvanhov...@questtec.nl> wrote:
> I am
I am not sure if you can push a limit through a join. This becomes
problematic if not all keys are present on both sides; in such a case a
limit can produce fewer rows than the set limit.
This might be a rare case in which whole stage codegen is slower, due to
the fact that we need to buffer the r