Hi, Thanks a lot.
The problem is not do non-equal join for large tables, in fact, one table is really small and another one is huge. The problem is that spark can only get the correct size for dataframe created directly from hive table. Even we create a dataframe from local collection, it uses defaultSizeInBytes as its size. (Here, I am really confused: why we don't use LogicalLocalTable in exsitingRDD.scala to estimate its size. As I understand, this case class is created for this purpose) Then if we do some join or unionAll operation on this dataframe, the estimated size will explode. For instance, if we do join, val df = df1.join(df2, condition) then df.queryExecution.analyzed.statistics.sizeInBytes = df1 * df2 In my case, I create df1 instance from an existing rdd. I find a workaround for this problem: 1. save df1 in hive table 2. read this hive table and create a new dataframe 3. do outer join with this new dataframe Cheers Gen On Wed, Aug 12, 2015 at 10:12 AM, Cheng, Hao <hao.ch...@intel.com> wrote: > Firstly, spark.sql.autoBroadcastJoinThreshold only works for the EQUAL > JOIN. > > > > Currently, for the non-equal join, if the join type is the INNER join, > then it will be done by CartesianProduct join and BroadcastNestedLoopJoin > works for the outer joins. > > > > In the BroadcastnestedLoopJoin, the table with smaller estimate size will > be broadcasted, but if the smaller table is also a huge table, I don’t > think Spark SQL can handle that right now (OOM). > > > > So, I am not sure how you created the df1 instance, but we’d better to > reflect the real size for the statistics of it, and let the framework > decide what to do, hopefully Spark Sql can support the non-equal join for > large tables in the next release. > > > > Hao > > > > *From:* gen tang [mailto:gen.tan...@gmail.com] > *Sent:* Tuesday, August 11, 2015 10:12 PM > *To:* dev@spark.apache.org > *Subject:* Potential bug broadcastNestedLoopJoin or default value of > spark.sql.autoBroadcastJoinThreshold > > > > Hi, > > > > Recently, I use spark sql to do join on non-equality condition, condition1 > or condition2 for example. > > > > Spark will use broadcastNestedLoopJoin to do this. Assume that one of > dataframe(df1) is not created from hive table nor local collection and the > other one is created from hivetable(df2). For df1, spark will use > defaultSizeInBytes * length of df1 to estimate the size of df1 and use > correct size for df2. > > > > As the result, in most cases, spark will think df1 is bigger than df2 even > df2 is really huge. And spark will do df2.collect(), which will cause error > or slowness of program. > > > > Maybe we should just use defaultSizeInBytes for logicalRDD, not > defaultSizeInBytes * length? > > > > Hope this could be helpful > > Thanks a lot in advance for your help and input. > > > > Cheers > > Gen > > >