Hi Sid, This example code with output will add some more clarity spark-shell --conf spark.sql.shuffle.partitions=3 --conf > spark.sql.autoBroadcastJoinThreshold=-1 > > > scala> import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.DataFrame > > scala> import org.apache.spark.sql.functions.{array, concat, explode, > floor, lit, rand} > import org.apache.spark.sql.functions.{array, concat, explode, floor, lit, > rand} > > > > scala> import spark.implicits._ > import spark.implicits._ > > scala> > > scala> val df1 = Seq( > | ("x", "bc"), > | ("x", "ce"), > | ("x", "ab"), > | ("x", "ef"), > | ("x", "gh"), > | ("y", "hk"), > | ("z", "jk") > | ).toDF("t1c1","t1c2") > df1: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string] > > scala> df1.show(10,false) > +----+----+ > |t1c1|t1c2| > +----+----+ > |x |bc | > |x |ce | > |x |ab | > |x |ef | > |x |gh | > |y |hk | > |z |jk | > +----+----+ > > > scala> val df2 = Seq( > | ("x", "gkl"), > | ("y", "nmb"), > | ("z", "qwe") > | ).toDF("t2c1","t2c2") > df2: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string] > > scala> df2.show(10,false) > +----+----+ > |t2c1|t2c2| > +----+----+ > |x |gkl | > |y |nmb | > |z |qwe | > +----+----+ > > > scala> > | def applySalt(leftTable: DataFrame, leftCol: String, rightTable: > DataFrame) = { > | > | var df1 = leftTable > | .withColumn(leftCol, concat( > | leftTable.col(leftCol), lit("_"), lit(floor(rand(123456) * > 10)))) > | var df2 = rightTable > | .withColumn("explodedCol", > | explode( > | array((0 to 10).map(lit(_)): _ *) > | )) > | > | (df1, df2) > | } > applySalt: (leftTable: org.apache.spark.sql.DataFrame, leftCol: String, > rightTable: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame, > org.apache.spark.sql.DataFrame) > > scala> val (df3, df4) = applySalt(df1, "t1c1", df2) > df3: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string] > df4: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string ... 1 > more field] > > scala> > > scala> df3.show(100, false) > +----+----+ > |t1c1|t1c2| > +----+----+ > |x_4 |bc | > |x_8 |ce | > |x_3 |ab | > |x_0 |ef | > |x_6 |gh | > |y_9 |hk | > |z_7 |jk | > +----+----+ > > > scala> df4.show(100, false) > +----+----+-----------+ > |t2c1|t2c2|explodedCol| > +----+----+-----------+ > |x |gkl |0 | > |x |gkl |1 | > |x |gkl |2 | > |x |gkl |3 | > |x |gkl |4 | > |x |gkl |5 | > |x |gkl |6 | > |x |gkl |7 | > |x |gkl |8 | > |x |gkl |9 | > |x |gkl |10 | > |y |nmb |0 | > |y |nmb |1 | > |y |nmb |2 | > |y |nmb |3 | > |y |nmb |4 | > |y |nmb |5 | > |y |nmb |6 | > |y |nmb |7 | > |y |nmb |8 | > |y |nmb |9 | > |y |nmb |10 | > |z |qwe |0 | > |z |qwe |1 | > |z |qwe |2 | > |z |qwe |3 | > |z |qwe |4 | > |z |qwe |5 | > |z |qwe |6 | > |z |qwe |7 | > |z |qwe |8 | > |z |qwe |9 | > |z |qwe |10 | > +----+----+-----------+ > > > scala> //join after elminating data skewness > > scala> val df5 = df3.join(df4, df3.col("t1c1")<=> > concat(df4.col("t2c1"),lit("_"),df4.col("explodedCol"))) > df5: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string ... 3 > more fields] > > scala> df5.show(100,false) > +----+----+----+----+-----------+ > |t1c1|t1c2|t2c1|t2c2|explodedCol| > +----+----+----+----+-----------+ > |x_0 |ef |x |gkl |0 | > |y_9 |hk |y |nmb |9 | > |x_3 |ab |x |gkl |3 | > |x_4 |bc |x |gkl |4 | > |x_6 |gh |x |gkl |6 | > |z_7 |jk |z |qwe |7 | > |x_8 |ce |x |gkl |8 | > +----+----+----+----+-----------+ > > scala> df5.queryExecution.sparkPlan > res14: org.apache.spark.sql.execution.SparkPlan = > SortMergeJoin [coalesce(t1c1#32, ), isnull(t1c1#32)], > [coalesce(concat(t2c1#21, _, cast(explodedCol#36 as string)), ), > isnull(concat(t2c1#21, _, cast(explodedCol#36 as string)))], Inner > :- LocalTableScan [t1c1#32, t1c2#6] > +- Generate explode([0,1,2,3,4,5,6,7,8,9,10]), [t2c1#21, t2c2#22], false, > [explodedCol#36] > +- LocalTableScan [t2c1#21, t2c2#22] > > > scala> df5.drop("t1c1", "explodedCol").show > +----+----+----+ > |t1c2|t2c1|t2c2| > +----+----+----+ > | ef| x| gkl| > | hk| y| nmb| > | ab| x| gkl| > | bc| x| gkl| > | gh| x| gkl| > | jk| z| qwe| > | ce| x| gkl| > +----+----+----+ >
Regards Vinod On Sun, Jul 31, 2022 at 1:59 AM ayan guha <guha.a...@gmail.com> wrote: > One option is create a separate column in table A with salting. Use it as > partition key. Use original column for joining. > > Ayan > > On Sun, 31 Jul 2022 at 6:45 pm, Jacob Lynn <abebopare...@gmail.com> wrote: > >> The key is this line from Amit's email (emphasis added): >> >> > Change the join_col to *all possible values* of the sale. >> >> The two tables are treated asymmetrically: >> >> 1. The skewed table gets random salts appended to the join key. >> 2. The other table gets all possible salts appended to the join key (e.g. >> using a range array literal + explode). >> >> Thus guarantees that every row in the skewed table will match a row in >> the other table. This StackOverflow answer >> <https://stackoverflow.com/a/57951114/1892435> gives an example. >> >> Op zo 31 jul. 2022 om 10:41 schreef Amit Joshi <mailtojoshia...@gmail.com >> >: >> >>> Hi Sid, >>> >>> I am not sure I understood your question. >>> But the keys cannot be different post salting in both the tables, this >>> is what i have shown in the explanation. >>> You salt Table A and then explode Table B to create all possible values. >>> >>> In your case, I do not understand, what Table B has x_8/9. It should be >>> all possible values which you used to create salt. >>> >>> I hope you understand. >>> >>> Thanks >>> >>> >>> >>> On Sun, Jul 31, 2022 at 10:02 AM Sid <flinkbyhe...@gmail.com> wrote: >>> >>>> Hi Amit, >>>> >>>> Thanks for your reply. However, your answer doesn't seem different from >>>> what I have explained. >>>> >>>> My question is after salting if the keys are different like in my >>>> example then post join there would be no results assuming the join type as >>>> inner join because even though the keys are segregated in different >>>> partitions based on unique keys they are not matching because x_1/x_2 >>>> !=x_8/x_9 >>>> >>>> How do you ensure that the results are matched? >>>> >>>> Best, >>>> Sid >>>> >>>> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi <mailtojoshia...@gmail.com> >>>> wrote: >>>> >>>>> Hi Sid, >>>>> >>>>> Salting is normally a technique to add random characters to existing >>>>> values. >>>>> In big data we can use salting to deal with the skewness. >>>>> Salting in join cas be used as : >>>>> * Table A-* >>>>> Col1, join_col , where join_col values are {x1, x2, x3} >>>>> x1 >>>>> x1 >>>>> x1 >>>>> x2 >>>>> x2 >>>>> x3 >>>>> >>>>> *Table B-* >>>>> join_col, Col3 , where join_col value are {x1, x2} >>>>> x1 >>>>> x2 >>>>> >>>>> *Problem: *Let say for table A, data is skewed on x1 >>>>> Now salting goes like this. *Salt value =2* >>>>> For >>>>> *table A, *create a new col with values by salting join col >>>>> *New_Join_Col* >>>>> x1_1 >>>>> x1_2 >>>>> x1_1 >>>>> x2_1 >>>>> x2_2 >>>>> x3_1 >>>>> >>>>> For *Table B,* >>>>> Change the join_col to all possible values of the sale. >>>>> join_col >>>>> x1_1 >>>>> x1_2 >>>>> x2_1 >>>>> x2_2 >>>>> >>>>> And then join it like >>>>> table1.join(table2, where tableA.new_join_col == tableB. join_col) >>>>> >>>>> Let me know if you have any questions. >>>>> >>>>> Regards >>>>> Amit Joshi >>>>> >>>>> >>>>> On Sat, Jul 30, 2022 at 7:16 PM Sid <flinkbyhe...@gmail.com> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> I was trying to understand the Salting technique for the column where >>>>>> there would be a huge load on a single partition because of the same >>>>>> keys. >>>>>> >>>>>> I referred to one youtube video with the below understanding: >>>>>> >>>>>> So, using the salting technique we can actually change the joining >>>>>> column values by appending some random number in a specified range. >>>>>> >>>>>> So, suppose I have these two values in a partition of two different >>>>>> tables: >>>>>> >>>>>> Table A: >>>>>> Partition1: >>>>>> x >>>>>> . >>>>>> . >>>>>> . >>>>>> x >>>>>> >>>>>> Table B: >>>>>> Partition1: >>>>>> x >>>>>> . >>>>>> . >>>>>> . >>>>>> x >>>>>> >>>>>> After Salting it would be something like the below: >>>>>> >>>>>> Table A: >>>>>> Partition1: >>>>>> x_1 >>>>>> >>>>>> Partition 2: >>>>>> x_2 >>>>>> >>>>>> Table B: >>>>>> Partition1: >>>>>> x_3 >>>>>> >>>>>> Partition 2: >>>>>> x_8 >>>>>> >>>>>> Now, when I inner join these two tables after salting in order to >>>>>> avoid data skewness problems, I won't get a match since the keys are >>>>>> different after applying salting techniques. >>>>>> >>>>>> So how does this resolves the data skewness issue or if there is some >>>>>> understanding gap? >>>>>> >>>>>> Could anyone help me in layman's terms? >>>>>> >>>>>> TIA, >>>>>> Sid >>>>>> >>>>> -- > Best Regards, > Ayan Guha >