Hi Sid,
This example code with output will add some more clarity

spark-shell --conf spark.sql.shuffle.partitions=3 --conf
> spark.sql.autoBroadcastJoinThreshold=-1
>
>
> scala> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.DataFrame
>
> scala> import org.apache.spark.sql.functions.{array, concat, explode,
> floor, lit, rand}
> import org.apache.spark.sql.functions.{array, concat, explode, floor, lit,
> rand}
>
>
>
> scala>  import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala>  val df1 = Seq(
>      |     ("x", "bc"),
>      |     ("x", "ce"),
>      |     ("x", "ab"),
>      |     ("x", "ef"),
>      |     ("x", "gh"),
>      |     ("y", "hk"),
>      |     ("z", "jk")
>      |   ).toDF("t1c1","t1c2")
> df1: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
>
> scala>  df1.show(10,false)
> +----+----+
> |t1c1|t1c2|
> +----+----+
> |x   |bc  |
> |x   |ce  |
> |x   |ab  |
> |x   |ef  |
> |x   |gh  |
> |y   |hk  |
> |z   |jk  |
> +----+----+
>
>
> scala> val df2 = Seq(
>      |     ("x", "gkl"),
>      |     ("y", "nmb"),
>      |     ("z", "qwe")
>      |   ).toDF("t2c1","t2c2")
> df2: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string]
>
> scala>    df2.show(10,false)
> +----+----+
> |t2c1|t2c2|
> +----+----+
> |x   |gkl |
> |y   |nmb |
> |z   |qwe |
> +----+----+
>
>
> scala>
>      |   def applySalt(leftTable: DataFrame, leftCol: String, rightTable:
> DataFrame) = {
>      |
>      |     var df1 = leftTable
>      |       .withColumn(leftCol, concat(
>      |         leftTable.col(leftCol), lit("_"), lit(floor(rand(123456) *
> 10))))
>      |     var df2 = rightTable
>      |       .withColumn("explodedCol",
>      |         explode(
>      |           array((0 to 10).map(lit(_)): _ *)
>      |         ))
>      |
>      |     (df1, df2)
>      |   }
> applySalt: (leftTable: org.apache.spark.sql.DataFrame, leftCol: String,
> rightTable: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame,
> org.apache.spark.sql.DataFrame)
>
> scala>   val (df3, df4) = applySalt(df1, "t1c1", df2)
> df3: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
> df4: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string ... 1
> more field]
>
> scala>
>
> scala>   df3.show(100, false)
> +----+----+
> |t1c1|t1c2|
> +----+----+
> |x_4 |bc  |
> |x_8 |ce  |
> |x_3 |ab  |
> |x_0 |ef  |
> |x_6 |gh  |
> |y_9 |hk  |
> |z_7 |jk  |
> +----+----+
>
>
> scala>   df4.show(100, false)
> +----+----+-----------+
> |t2c1|t2c2|explodedCol|
> +----+----+-----------+
> |x   |gkl |0          |
> |x   |gkl |1          |
> |x   |gkl |2          |
> |x   |gkl |3          |
> |x   |gkl |4          |
> |x   |gkl |5          |
> |x   |gkl |6          |
> |x   |gkl |7          |
> |x   |gkl |8          |
> |x   |gkl |9          |
> |x   |gkl |10         |
> |y   |nmb |0          |
> |y   |nmb |1          |
> |y   |nmb |2          |
> |y   |nmb |3          |
> |y   |nmb |4          |
> |y   |nmb |5          |
> |y   |nmb |6          |
> |y   |nmb |7          |
> |y   |nmb |8          |
> |y   |nmb |9          |
> |y   |nmb |10         |
> |z   |qwe |0          |
> |z   |qwe |1          |
> |z   |qwe |2          |
> |z   |qwe |3          |
> |z   |qwe |4          |
> |z   |qwe |5          |
> |z   |qwe |6          |
> |z   |qwe |7          |
> |z   |qwe |8          |
> |z   |qwe |9          |
> |z   |qwe |10         |
> +----+----+-----------+
>
>
> scala>   //join after elminating data skewness
>
> scala>   val df5 = df3.join(df4, df3.col("t1c1")<=>
> concat(df4.col("t2c1"),lit("_"),df4.col("explodedCol")))
> df5: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string ... 3
> more fields]
>
> scala> df5.show(100,false)
> +----+----+----+----+-----------+
> |t1c1|t1c2|t2c1|t2c2|explodedCol|
> +----+----+----+----+-----------+
> |x_0 |ef  |x   |gkl |0          |
> |y_9 |hk  |y   |nmb |9          |
> |x_3 |ab  |x   |gkl |3          |
> |x_4 |bc  |x   |gkl |4          |
> |x_6 |gh  |x   |gkl |6          |
> |z_7 |jk  |z   |qwe |7          |
> |x_8 |ce  |x   |gkl |8          |
> +----+----+----+----+-----------+
>
> scala> df5.queryExecution.sparkPlan
> res14: org.apache.spark.sql.execution.SparkPlan =
> SortMergeJoin [coalesce(t1c1#32, ), isnull(t1c1#32)],
> [coalesce(concat(t2c1#21, _, cast(explodedCol#36 as string)), ),
> isnull(concat(t2c1#21, _, cast(explodedCol#36 as string)))], Inner
> :- LocalTableScan [t1c1#32, t1c2#6]
> +- Generate explode([0,1,2,3,4,5,6,7,8,9,10]), [t2c1#21, t2c2#22], false,
> [explodedCol#36]
>    +- LocalTableScan [t2c1#21, t2c2#22]
>
>
> scala> df5.drop("t1c1", "explodedCol").show
> +----+----+----+
> |t1c2|t2c1|t2c2|
> +----+----+----+
> |  ef|   x| gkl|
> |  hk|   y| nmb|
> |  ab|   x| gkl|
> |  bc|   x| gkl|
> |  gh|   x| gkl|
> |  jk|   z| qwe|
> |  ce|   x| gkl|
> +----+----+----+
>

Regards
Vinod

On Sun, Jul 31, 2022 at 1:59 AM ayan guha <guha.a...@gmail.com> wrote:

> One option is create a separate column in table A with salting. Use it as
> partition key. Use original column for joining.
>
> Ayan
>
> On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn <abebopare...@gmail.com> wrote:
>
>> The key is this line from Amit's email (emphasis added):
>>
>> > Change the join_col to *all possible values* of the sale.
>>
>> The two tables are treated asymmetrically:
>>
>> 1. The skewed table gets random salts appended to the join key.
>> 2. The other table gets all possible salts appended to the join key (e.g.
>> using a range array literal + explode).
>>
>> Thus guarantees that every row in the skewed table will match a row in
>> the other table. This StackOverflow answer
>> <https://stackoverflow.com/a/57951114/1892435> gives an example.
>>
>> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi <mailtojoshia...@gmail.com
>> >:
>>
>>> Hi Sid,
>>>
>>> I am not sure I understood your question.
>>> But the keys cannot be different post salting in both the tables, this
>>> is what i have shown in the explanation.
>>> You salt Table A and then explode Table B to create all possible values.
>>>
>>> In your case, I do not understand, what Table B has x_8/9. It should be
>>> all possible values which you used to create salt.
>>>
>>> I hope you understand.
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Sun, Jul 31, 2022 at 10:02 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hi Amit,
>>>>
>>>> Thanks for your reply. However, your answer doesn't seem different from
>>>> what I have explained.
>>>>
>>>> My question is after salting if the keys are different like in my
>>>> example then post join there would be no results assuming the join type as
>>>> inner join because even though the keys are segregated in different
>>>> partitions based on unique keys they are not matching because x_1/x_2
>>>> !=x_8/x_9
>>>>
>>>> How do you ensure that the results are matched?
>>>>
>>>> Best,
>>>> Sid
>>>>
>>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi <mailtojoshia...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sid,
>>>>>
>>>>> Salting is normally a technique to add random characters to existing
>>>>> values.
>>>>> In big data we can use salting to deal with the skewness.
>>>>> Salting in join cas be used as :
>>>>> * Table A-*
>>>>> Col1, join_col , where join_col values are {x1, x2, x3}
>>>>> x1
>>>>> x1
>>>>> x1
>>>>> x2
>>>>> x2
>>>>> x3
>>>>>
>>>>> *Table B-*
>>>>> join_col, Col3 , where join_col  value are {x1, x2}
>>>>> x1
>>>>> x2
>>>>>
>>>>> *Problem: *Let say for table A, data is skewed on x1
>>>>> Now salting goes like this.  *Salt value =2*
>>>>> For
>>>>> *table A, *create a new col with values by salting join col
>>>>> *New_Join_Col*
>>>>> x1_1
>>>>> x1_2
>>>>> x1_1
>>>>> x2_1
>>>>> x2_2
>>>>> x3_1
>>>>>
>>>>> For *Table B,*
>>>>> Change the join_col to all possible values of the sale.
>>>>> join_col
>>>>> x1_1
>>>>> x1_2
>>>>> x2_1
>>>>> x2_2
>>>>>
>>>>> And then join it like
>>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>>>>>
>>>>> Let me know if you have any questions.
>>>>>
>>>>> Regards
>>>>> Amit Joshi
>>>>>
>>>>>
>>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid <flinkbyhe...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I was trying to understand the Salting technique for the column where
>>>>>> there would be a huge load on a single partition because of the same 
>>>>>> keys.
>>>>>>
>>>>>> I referred to one youtube video with the below understanding:
>>>>>>
>>>>>> So, using the salting technique we can actually change the joining
>>>>>> column values by appending some random number in a specified range.
>>>>>>
>>>>>> So, suppose I have these two values in a partition of two different
>>>>>> tables:
>>>>>>
>>>>>> Table A:
>>>>>> Partition1:
>>>>>> x
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> x
>>>>>>
>>>>>> Table B:
>>>>>> Partition1:
>>>>>> x
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> x
>>>>>>
>>>>>> After Salting it would be something like the below:
>>>>>>
>>>>>> Table A:
>>>>>> Partition1:
>>>>>> x_1
>>>>>>
>>>>>> Partition 2:
>>>>>> x_2
>>>>>>
>>>>>> Table B:
>>>>>> Partition1:
>>>>>> x_3
>>>>>>
>>>>>> Partition 2:
>>>>>> x_8
>>>>>>
>>>>>> Now, when I inner join these two tables after salting in order to
>>>>>> avoid data skewness problems, I won't get a match since the keys are
>>>>>> different after applying salting techniques.
>>>>>>
>>>>>> So how does this resolves the data skewness issue or if there is some
>>>>>> understanding gap?
>>>>>>
>>>>>> Could anyone help me in layman's terms?
>>>>>>
>>>>>> TIA,
>>>>>> Sid
>>>>>>
>>>>> --
> Best Regards,
> Ayan Guha
>

Reply via email to