Is the following what you trying to do?
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain
I see no exchange:
== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
:- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
: +- *(1) Project [x#342, y#343]
: +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
: +- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
+- *(2) Project [x#346, y#347]
+- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
+- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8
On Sun, May 31, 2020 at 2:38 PM Patrick Woody <[email protected]>
wrote:
> Hey Terry,
>
> Thanks for the response! I'm not sure that it ends up working though - the
> bucketing still seems to require the exchange before the join. Both tables
> below are saved bucketed by "x":
> *(5) Project [x#29, y#30, z#31, z#37]
> +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
> :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
> * : +- Exchange hashpartitioning(x#29, y#30, 200)*
> : +- *(1) Project [x#29, y#30, z#31]
> : +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
> : +- *(1) FileScan parquet default.ax[x#29,y#30,z#31]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
> +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
> * +- Exchange hashpartitioning(x#35, y#36, 200)*
> +- *(3) Project [x#35, y#36, z#37]
> +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
> +- *(3) FileScan parquet default.bx[x#35,y#36,z#37]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
>
> Best,
> Pat
>
>
>
> On Sun, May 31, 2020 at 3:15 PM Terry Kim <[email protected]> wrote:
>
>> You can use bucketBy to avoid shuffling in your scenario. This test suite
>> has some examples:
>> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>>
>> Thanks,
>> Terry
>>
>> On Sun, May 31, 2020 at 7:43 AM Patrick Woody <[email protected]>
>> wrote:
>>
>>> Hey all,
>>>
>>> I have one large table, A, and two medium sized tables, B & C, that I'm
>>> trying to complete a join on efficiently. The result is multiplicative on A
>>> join B, so I'd like to avoid shuffling that result. For this example, let's
>>> just assume each table has three columns, x, y, z. The below is all being
>>> tested on Spark 2.4.5 locally.
>>>
>>> I'd like to perform the following join:
>>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>>> This outputs the following physical plan:
>>> == Physical Plan ==
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>> : +- Exchange hashpartitioning(x#32, z#34, 200)
>>> : +- *(3) Project [x#32, y#33, z#34, z#74]
>>> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>> : : +- Exchange hashpartitioning(x#32, y#33, 200)
>>> : : +- LocalTableScan [x#32, y#33, z#34]
>>> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS
>>> FIRST], false, 0
>>> : +- Exchange hashpartitioning(x#72, y#73, 200)
>>> : +- LocalTableScan [x#72, y#73, z#74]
>>> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>> +- Exchange hashpartitioning(x#52, z#54, 200)
>>> +- LocalTableScan [x#52, y#53, z#54]
>>>
>>>
>>> I may be misremembering, but in the past I thought you had the ability
>>> to pre-partition each table by "x" and it would satisfy the requirements of
>>> the join since it is already clustered by the key on both sides using the
>>> same hash function (this assumes numPartitions lines up obviously). However
>>> it seems like it will insert another exchange:
>>>
>>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>> :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>> : +- Exchange hashpartitioning(x#32, z#34, 200)
>>> : +- *(3) Project [x#32, y#33, z#34, z#74]
>>> : +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>> : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>> : : +- Exchange hashpartitioning(x#32, y#33, 200)
>>> : : +- Exchange hashpartitioning(x#32, 200)
>>> : : +- LocalTableScan [x#32, y#33, z#34]
>>> : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS
>>> FIRST], false, 0
>>> : +- Exchange hashpartitioning(x#72, y#73, 200)
>>> : +- Exchange hashpartitioning(x#72, 200)
>>> : +- LocalTableScan [x#72, y#73, z#74]
>>> +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>> +- Exchange hashpartitioning(x#52, z#54, 200)
>>> +- ReusedExchange [x#52, y#53, z#54], Exchange
>>> hashpartitioning(x#32, 200).
>>>
>>> Note, that using this "strategy" with groupBy("x", "y") works fine
>>> though I assume that is because it doesn't have to consider the other side
>>> of the join.
>>>
>>> Did this used to work or am I simply confusing it with groupBy? Either
>>> way - any thoughts on how I can avoid shuffling the bulk of the join result?
>>>
>>> Thanks,
>>> Pat
>>>
>>>
>>>
>>>
>>>