Anyone have ideas about the below Q? It seems to me that given that "diamond" DAG, that spark could see that the rows haven't been shuffled/filtered, it could do some type of "zip join" to push them together, but I've not been able to get a plan that doesn't do a hash/sort merge join
Cheers Andrew On Wed, May 12, 2021 at 11:32 AM Andrew Melo <andrew.m...@gmail.com> wrote: > > Hi, > > In the case where the left and right hand side share a common parent like: > > df = spark.read.someDataframe().withColumn('rownum', row_number()) > df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum') > df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum') > df_joined = df1.join(df2, 'rownum', 'inner') > > (or maybe replacing row_number() with monotonically_increasing_id()....) > > Is there some hint/optimization that can be done to let Spark know > that the left and right hand-sides of the join share the same > ordering, and a sort/hash merge doesn't need to be done? > > Thanks > Andrew > > On Wed, May 12, 2021 at 11:07 AM Sean Owen <sro...@gmail.com> wrote: > > > > Yeah I don't think that's going to work - you aren't guaranteed to get 1, > > 2, 3, etc. I think row_number() might be what you need to generate a join > > ID. > > > > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You > > could .zip two RDDs you get from DataFrames and manually convert the Rows > > back to a single Row and back to DataFrame. > > > > > > On Wed, May 12, 2021 at 10:47 AM kushagra deep <kushagra94d...@gmail.com> > > wrote: > >> > >> Thanks Raghvendra > >> > >> Will the ids for corresponding columns be same always ? Since > >> monotonic_increasing_id() returns a number based on partitionId and the > >> row number of the partition ,will it be same for corresponding columns? > >> Also is it guaranteed that the two dataframes will be divided into logical > >> spark partitions with the same cardinality for each partition ? > >> > >> Reg, > >> Kushagra Deep > >> > >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh <raghavendr...@gmail.com> > >> wrote: > >>> > >>> You can add an extra id column and perform an inner join. > >>> > >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id()) > >>> > >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id()) > >>> > >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show() > >>> > >>> +---------+---------+ > >>> > >>> |amount_6m|amount_9m| > >>> > >>> +---------+---------+ > >>> > >>> | 100| 500| > >>> > >>> | 200| 600| > >>> > >>> | 300| 700| > >>> > >>> | 400| 800| > >>> > >>> | 500| 900| > >>> > >>> +---------+---------+ > >>> > >>> > >>> -- > >>> Raghavendra > >>> > >>> > >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep <kushagra94d...@gmail.com> > >>> wrote: > >>>> > >>>> Hi All, > >>>> > >>>> I have two dataframes > >>>> > >>>> df1 > >>>> > >>>> amount_6m > >>>> 100 > >>>> 200 > >>>> 300 > >>>> 400 > >>>> 500 > >>>> > >>>> And a second data df2 below > >>>> > >>>> amount_9m > >>>> 500 > >>>> 600 > >>>> 700 > >>>> 800 > >>>> 900 > >>>> > >>>> The number of rows is same in both dataframes. > >>>> > >>>> Can I merge the two dataframes to achieve below df > >>>> > >>>> df3 > >>>> > >>>> amount_6m | amount_9m > >>>> 100 500 > >>>> 200 600 > >>>> 300 700 > >>>> 400 800 > >>>> 500 900 > >>>> > >>>> Thanks in advance > >>>> > >>>> Reg, > >>>> Kushagra Deep > >>>> --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org