spark.sql.autoBroadcastJoinThreshold default value in 1.5.2 is 10MB According to the output in console Spark is doing broadcast, but query which looks like the following does not perform well
select big_t.*, small_t.name range_name from big_t join small_t on (1=1) where small_t.min <= big_t.v and big_t.v < small_t.max instead of it I registered UDF which returns range_name val ranges = sqlContext.sql("select min_v, max_v, name from small_t"). collect().map(r => (r.getLong(0), r.getLong(1), r.getString(2))).sortBy(_._1) sqlContext.udf.register("findRangeName", (v: java.lang.Long) => RangeUDF.findName(v, ranges)) // RangeUDF.findName def findName(vObj: java.lang.Long, ranges: Array[(Long, Long, String)]): String = { val v = if (vObj == null) -1L else vObj.longValue() ranges.find(x => x._1 <= v && v < x._2).map(_._3).getOrElse("") } // Now I can use udf to get rangeName select big_t.*, findRangeName(v) range_name from big_t On Sun, Dec 20, 2015 at 9:16 AM, Chris Fregly <ch...@fregly.com> wrote: > this type of broadcast should be handled by Spark SQL/DataFrames > automatically. > > this is the primary cost-based, physical-plan query optimization that the > Spark SQL Catalyst optimizer supports. > > in Spark 1.5 and before, you can trigger this optimization by properly > setting the spark.sql.autobroadcastThreshold to a value that is *above* the > size of your smaller table when fully bloated in JVM memory (not the > serialized size of the data on disk - very common mistake). > > in Spark 1.6+, there are heuristics to make this decision dynamically - > and even allow hybrid execution where certain keys - within the same Spark > job - will be broadcast and others won't depending on their relative " > "hotness" for that particular job. > > common theme of Spark 1.6 and beyond will be adaptive physical plan > execution, adaptive memory allocation to RDD Cache vs Spark Execution > Engine, adaptive cluster resource allocation, etc. > > the goal being to minimize manual configuration and enable many diff types > of workloads to run efficiently on the same Spark cluster. > > On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > > I collected small DF to array of tuple3 > Then I registered UDF with function which is doing lookup in the array > Then I just run select which uses the UDF. > On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > >> You can broadcast your json data and then do a map side join. This >> article is a good start >> http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ >> >> Thanks >> Best Regards >> >> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov < >> apivova...@gmail.com> wrote: >> >>> I have big folder having ORC files. Files have duration field (e.g. >>> 3,12,26, etc) >>> Also I have small json file (just 8 rows) with ranges definition (min, >>> max , name) >>> 0, 10, A >>> 10, 20, B >>> 20, 30, C >>> etc >>> >>> Because I can not do equi-join btw duration and range min/max I need to >>> do cross join and apply WHERE condition to take records which belong to the >>> range >>> Cross join is an expensive operation I think that it's better if this >>> particular join done using Map Join >>> >>> How to do Map join in Spark Sql? >>> >> >>