Hi guys, I have another example to illustrate the issue. I think the problem is pretty nasty.
val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to 99)).toDF("id", "label") val d1 = base.where($"label" < 60) val d2 = base.where($"label" === 60) d1.join(d2, "id").show +---+-----+-----+ | id|label|label| +---+-----+-----+ | 40| 40| 60| +---+-----+-----+ d1.join(d2, "id").select(d1("label")).show +-----+ |label| +-----+ | 40| +-----+ (expected answer: 40, right!) d1.join(d2, "id").select(d2("label")).show +-----+ |label| +-----+ | 40| +-----+ (expected answer: 60, wrong!) d1.join(d2, "id").select(d2("label")).explain == Physical Plan == TungstenProject [label#15] SortMergeJoin [id#14], [id#30] TungstenSort [id#14 ASC], false, 0 TungstenExchange hashpartitioning(id#14) TungstenProject [_1#12 AS id#14,_2#13 AS label#15] Filter (_2#13 < 60) Scan PhysicalRDD[_1#12,_2#13] TungstenSort [id#30 ASC], false, 0 TungstenExchange hashpartitioning(id#30) TungstenProject [_1#12 AS id#30] Filter (_2#13 = 60) Scan PhysicalRDD[_1#12,_2#13] Again, this is just a tip of the iceberg. I have spent hours to find out this weird behaviour. Best Regards, Jerry Best Regards, Jerry On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote: > Hi Sunitha, > > Thank you for the reference Jira. It looks like this is the bug I'm > hitting. Most of the bugs related to this seems to associate with > dataframes derived from the one dataframe (base in this case). In SQL, this > is a self-join and dropping d2.label should not affect d1.label. There are > other bugs I found these three days that are associated with this type of > joins. In one case, if I don't drop the duplicate column BEFORE the join, > spark has preferences on the columns from d2 dataframe. I will see if I can > replicate in a small program like above. > > Best Regards, > > Jerry > > > On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <skambha...@gmail.com > > wrote: > >> Hi Jerry, >> >> I think you are running into an issue similar to SPARK-14040 >> https://issues.apache.org/jira/browse/SPARK-14040 >> >> One way to resolve it is to use alias. >> >> Here is an example that I tried on trunk and I do not see any exceptions. >> >> >> val d1=base.where($"label" === 0) as("d1") >> val d2=base.where($"label" === 1).as("d2") >> >> d1.join(d2, $"d1.id" === $"d2.id", >> "left_outer").drop($"d2.label").select($"d1.label") >> >> >> Hope this helps some. >> >> Best regards, >> Sunitha. >> >> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote: >> >> Hi spark users and developers, >> >> I'm using spark 1.5.1 (I have no choice because this is what we used). I >> ran into some very unexpected behaviour when I did some join operations >> lately. I cannot post my actual code here and the following code is not for >> practical reasons but it should demonstrate the issue. >> >> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to >> 99).map((_,1))).toDF("id", "label") >> val d1=base.where($"label" === 0) >> val d2=base.where($"label" === 1) >> d1.join(d2, d1("id") === d2("id"), >> "left_outer").drop(d2("label")).select(d1("label")) >> >> >> The above code will throw an exception saying the column label is not >> found. Do you have a reason for throwing an exception when the column has >> not been dropped for d1("label")? >> >> Best Regards, >> >> Jerry >> >> >> >