Anyone have ideas about the below Q?

It seems to me that given that "diamond" DAG, that spark could see
that the rows haven't been shuffled/filtered, it could do some type of
"zip join" to push them together, but I've not been able to get a plan
that doesn't do a hash/sort merge join

Cheers
Andrew

On Wed, May 12, 2021 at 11:32 AM Andrew Melo <andrew.m...@gmail.com> wrote:
>
> Hi,
>
> In the case where the left and right hand side share a common parent like:
>
> df = spark.read.someDataframe().withColumn('rownum', row_number())
> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
> df_joined = df1.join(df2, 'rownum', 'inner')
>
> (or maybe replacing row_number() with monotonically_increasing_id()....)
>
> Is there some hint/optimization that can be done to let Spark know
> that the left and right hand-sides of the join share the same
> ordering, and a sort/hash merge doesn't need to be done?
>
> Thanks
> Andrew
>
> On Wed, May 12, 2021 at 11:07 AM Sean Owen <sro...@gmail.com> wrote:
> >
> > Yeah I don't think that's going to work - you aren't guaranteed to get 1, 
> > 2, 3, etc. I think row_number() might be what you need to generate a join 
> > ID.
> >
> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> > could .zip two RDDs you get from DataFrames and manually convert the Rows 
> > back to a single Row and back to DataFrame.
> >
> >
> > On Wed, May 12, 2021 at 10:47 AM kushagra deep <kushagra94d...@gmail.com> 
> > wrote:
> >>
> >> Thanks Raghvendra
> >>
> >> Will the ids for corresponding columns  be same always ? Since 
> >> monotonic_increasing_id() returns a number based on partitionId and the 
> >> row number of the partition  ,will it be same for corresponding columns? 
> >> Also is it guaranteed that the two dataframes will be divided into logical 
> >> spark partitions with the same cardinality for each partition ?
> >>
> >> Reg,
> >> Kushagra Deep
> >>
> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh <raghavendr...@gmail.com> 
> >> wrote:
> >>>
> >>> You can add an extra id column and perform an inner join.
> >>>
> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
> >>>
> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
> >>>
> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
> >>>
> >>> +---------+---------+
> >>>
> >>> |amount_6m|amount_9m|
> >>>
> >>> +---------+---------+
> >>>
> >>> |      100|      500|
> >>>
> >>> |      200|      600|
> >>>
> >>> |      300|      700|
> >>>
> >>> |      400|      800|
> >>>
> >>> |      500|      900|
> >>>
> >>> +---------+---------+
> >>>
> >>>
> >>> --
> >>> Raghavendra
> >>>
> >>>
> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep <kushagra94d...@gmail.com> 
> >>> wrote:
> >>>>
> >>>> Hi All,
> >>>>
> >>>> I have two dataframes
> >>>>
> >>>> df1
> >>>>
> >>>> amount_6m
> >>>>  100
> >>>>  200
> >>>>  300
> >>>>  400
> >>>>  500
> >>>>
> >>>> And a second data df2 below
> >>>>
> >>>>  amount_9m
> >>>>   500
> >>>>   600
> >>>>   700
> >>>>   800
> >>>>   900
> >>>>
> >>>> The number of rows is same in both dataframes.
> >>>>
> >>>> Can I merge the two dataframes to achieve below df
> >>>>
> >>>> df3
> >>>>
> >>>> amount_6m | amount_9m
> >>>>     100                   500
> >>>>      200                  600
> >>>>      300                  700
> >>>>      400                  800
> >>>>      500                  900
> >>>>
> >>>> Thanks in advance
> >>>>
> >>>> Reg,
> >>>> Kushagra Deep
> >>>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to