I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit.
This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false. 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com>: > Hi, > > I ran the following query in spark (latest master codebase) and it took a > lot of time to complete even though it was a broadcast hash join. > > It appears that limit computation is done only after computing complete > join condition. Shouldn't the limit condition be pushed to > BroadcastHashJoin (wherein it would have to stop processing after > generating 10 rows?). Please let me know if my understanding on this is > wrong. > > > select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit > 10; > > >>>> > | == Physical Plan == > CollectLimit 10 > +- WholeStageCodegen > : +- Project [l_partkey#893] > : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, > BuildRight, None > : :- Project [l_partkey#893] > : : +- Filter isnotnull(l_partkey#893) > : : +- Scan HadoopFiles[l_partkey#893] Format: ORC, > PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int> > : +- INPUT > +- BroadcastExchange > HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as > bigint)),List(ps_partkey#908)) > +- WholeStageCodegen > : +- Project [ps_partkey#908] > : +- Filter isnotnull(ps_partkey#908) > : +- Scan HadoopFiles[ps_partkey#908] Format: ORC, > PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int> > | > >>>> > > > > > -- > ~Rajesh.B >