Hi guys,

I have another example to illustrate the issue. I think the problem is
pretty nasty.

val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to
99)).toDF("id", "label")
val d1 = base.where($"label" < 60)
val d2 = base.where($"label" === 60)
d1.join(d2, "id").show
+---+-----+-----+
| id|label|label|
+---+-----+-----+
| 40|   40|   60|
+---+-----+-----+

d1.join(d2, "id").select(d1("label")).show
+-----+
|label|
+-----+
|   40|
+-----+
(expected answer: 40, right!)

d1.join(d2, "id").select(d2("label")).show
+-----+
|label|
+-----+
|   40|
+-----+
(expected answer: 60, wrong!)

d1.join(d2, "id").select(d2("label")).explain
== Physical Plan ==
TungstenProject [label#15]
 SortMergeJoin [id#14], [id#30]
  TungstenSort [id#14 ASC], false, 0
   TungstenExchange hashpartitioning(id#14)
    TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
     Filter (_2#13 < 60)
      Scan PhysicalRDD[_1#12,_2#13]
  TungstenSort [id#30 ASC], false, 0
   TungstenExchange hashpartitioning(id#30)
    TungstenProject [_1#12 AS id#30]
     Filter (_2#13 = 60)
      Scan PhysicalRDD[_1#12,_2#13]

Again, this is just a tip of the iceberg. I have spent hours to find out
this weird behaviour.

Best Regards,

Jerry


Best Regards,

Jerry

On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Sunitha,
>
> Thank you for the reference Jira. It looks like this is the bug I'm
> hitting. Most of the bugs related to this seems to associate with
> dataframes derived from the one dataframe (base in this case). In SQL, this
> is a self-join and dropping d2.label should not affect d1.label. There are
> other bugs I found these three days that are associated with this type of
> joins. In one case, if I don't drop the duplicate column BEFORE the join,
> spark has preferences on the columns from d2 dataframe. I will see if I can
> replicate in a small program like above.
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <skambha...@gmail.com
> > wrote:
>
>> Hi Jerry,
>>
>> I think you are running into an issue similar to SPARK-14040
>> https://issues.apache.org/jira/browse/SPARK-14040
>>
>> One way to resolve it is to use alias.
>>
>> Here is an example that I tried on trunk and I do not see any exceptions.
>>
>>
>> val d1=base.where($"label" === 0) as("d1")
>> val d2=base.where($"label" === 1).as("d2")
>>
>> d1.join(d2, $"d1.id" === $"d2.id", 
>> "left_outer").drop($"d2.label").select($"d1.label")
>>
>>
>> Hope this helps some.
>>
>> Best regards,
>> Sunitha.
>>
>> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>> Hi spark users and developers,
>>
>> I'm using spark 1.5.1 (I have no choice because this is what we used). I
>> ran into some very unexpected behaviour when I did some join operations
>> lately. I cannot post my actual code here and the following code is not for
>> practical reasons but it should demonstrate the issue.
>>
>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>> 99).map((_,1))).toDF("id", "label")
>> val d1=base.where($"label" === 0)
>> val d2=base.where($"label" === 1)
>> d1.join(d2, d1("id") === d2("id"),
>> "left_outer").drop(d2("label")).select(d1("label"))
>>
>>
>> The above code will throw an exception saying the column label is not
>> found. Do you have a reason for throwing an exception when the column has
>> not been dropped for d1("label")?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>

Reply via email to