spark.sql.autoBroadcastJoinThreshold default value in 1.5.2 is 10MB
According to the output in console Spark is doing broadcast, but query
which looks like the following does not perform well

select
 big_t.*,
 small_t.name range_name
from big_t
join small_t on (1=1)
where small_t.min <= big_t.v and big_t.v < small_t.max

instead of it I registered UDF which returns range_name

val ranges = sqlContext.sql("select min_v, max_v, name from small_t").
  collect().map(r => (r.getLong(0), r.getLong(1), r.getString(2))).sortBy(_._1)
        
sqlContext.udf.register("findRangeName", (v: java.lang.Long) =>
RangeUDF.findName(v, ranges))

// RangeUDF.findName

def findName(vObj: java.lang.Long, ranges: Array[(Long, Long,
String)]): String = {
  val v = if (vObj == null) -1L else vObj.longValue()
  ranges.find(x => x._1 <= v && v < x._2).map(_._3).getOrElse("")
}

// Now I can use udf to get rangeName
select
 big_t.*,
 findRangeName(v) range_name
from big_t

On Sun, Dec 20, 2015 at 9:16 AM, Chris Fregly <ch...@fregly.com> wrote:

> this type of broadcast should be handled by Spark SQL/DataFrames
> automatically.
>
> this is the primary cost-based, physical-plan query optimization that the
> Spark SQL Catalyst optimizer supports.
>
> in Spark 1.5 and before, you can trigger this optimization by properly
> setting the spark.sql.autobroadcastThreshold to a value that is *above* the
> size of your smaller table when fully bloated in JVM memory (not the
> serialized size of the data on disk - very common mistake).
>
> in Spark 1.6+, there are heuristics to make this decision dynamically -
> and even allow hybrid execution where certain keys - within the same Spark
> job - will be broadcast and others won't depending on their relative "
> "hotness" for that particular job.
>
> common theme of Spark 1.6 and beyond will be adaptive physical plan
> execution, adaptive memory allocation to RDD Cache vs Spark Execution
> Engine, adaptive cluster resource allocation, etc.
>
> the goal being to minimize manual configuration and enable many diff types
> of workloads to run efficiently on the same Spark cluster.
>
> On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
> I collected small DF to array of tuple3
> Then I registered UDF with function which is doing lookup in the array
> Then I just run select which uses the UDF.
> On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:
>
>> You can broadcast your json data and then do a map side join. This
>> article is a good start
>> http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> I have big folder having ORC files. Files have duration field (e.g.
>>> 3,12,26, etc)
>>> Also I have small json file  (just 8 rows) with ranges definition (min,
>>> max , name)
>>> 0, 10, A
>>> 10, 20, B
>>> 20, 30, C
>>> etc
>>>
>>> Because I can not do equi-join btw duration and range min/max I need to
>>> do cross join and apply WHERE condition to take records which belong to the
>>> range
>>> Cross join is an expensive operation I think that it's better if this
>>> particular join done using Map Join
>>>
>>> How to do Map join in Spark Sql?
>>>
>>
>>

Reply via email to