Here it is :
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2991198123660769/823198936734135/866038034322120/latest.html


On Wed, Apr 11, 2018 at 10:55 AM, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Shiyuan,
> can you show us the output of ¨explain¨ over df (as a last step)?
>
> On 11 April 2018 at 19:47, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Variable name binding is a python thing, and Spark should not care how
>> the variable is named. What matters is the dependency graph. Spark fails to
>> handle this dependency graph correctly for which I am quite surprised: this
>> is just a simple combination of three very common sql operations.
>>
>>
>> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Shiyuan,
>>>
>>> I do not know whether I am right, but I would prefer to avoid
>>> expressions in Spark as:
>>>
>>> df = <<some transformation on df>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>
>>>> Here is the pretty print of the physical plan which reveals some
>>>> details about what causes the bug (see the lines highlighted in bold):
>>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>>
>>>>
>>>> 'Resolved attribute(s) kk#144L missing from
>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>> the operation: kk. Please check if the right attribute(s) are used
>>>>
>>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>    :  +- Join Inner, (ID#64 = ID#99)
>>>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>>    :     +- Project [ID#99]
>>>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>>> AS nL#90L]
>>>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>>> score#102]
>>>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>>> count#118L]
>>>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>>             +- Join Inner, (ID#135 = ID#99)
>>>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>>> score#138]*
>>>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L,
>>>> score#138]
>>>>                +- Project [ID#99]
>>>>                   +- Filter (nL#90L > cast(1 as bigint))
>>>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>>>> LABEL#100) AS nL#90L]
>>>>                         +- *!Project [ID#99, score#102, LABEL#100,
>>>> kk#128L]*
>>>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>>>> kk#73L, score#102]*
>>>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>
>>>> Here is the code which generates the error:
>>>>
>>>> import pyspark.sql.functions as F
>>>> from pyspark.sql import Row
>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>>> d("k","kk").select("ID","score","LABEL","kk")
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>>
>>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>>
>>>>> The spark warning about Row instead of Dict is not the culprit. The
>>>>> problem still persists after I use Row instead of Dict to generate the
>>>>> dataframe.
>>>>>
>>>>> Here is the expain() output regarding the reassignment of df as Gourav
>>>>> suggests to run, They look the same except that  the serial numbers
>>>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>>>
>>>>> this is the output of df.explain() after df =
>>>>> df.join(df_t.select("ID"),["ID"])
>>>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange 
>>>>> hashpartitioning(ID#7273,
>>>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>>>> isnotnull(ID#7303) +- *(3) Scan 
>>>>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>>>
>>>>>
>>>>> In comparison, this is the output of df1.explain() after  df1 =
>>>>> df.join(df_t.select("ID"),["ID"])?
>>>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange 
>>>>> hashpartitioning(ID#7344,
>>>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>>>> ,k#7376L,score#7377]
>>>>>
>>>>>
>>>>> Here is the code I run and the error I get in Spark 2.3.0. By looking
>>>>> at the error,  the cause seems to be that  spark doesn't look up the 
>>>>> column
>>>>> by its name but by a serial number and  the serial number somehow is 
>>>>> messed
>>>>> up.
>>>>>
>>>>> import pyspark.sql.functions as F
>>>>> from pyspark.sql import Row
>>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>>>
>>>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>>>   #line B
>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>> ilter(F.col("nL")>1)
>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>> "cnt1")
>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>
>>>>> This is the error:
>>>>> 'Resolved attribute(s) kk#144L missing from
>>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>>> the operation: kk. Please check if the right attribute(s) are
>>>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n 
>>>>> :
>>>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], 
>>>>> false\n'
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> what I am curious about is the reassignment of df.
>>>>>>
>>>>>> Can you please look into the explain plan of df after the statement
>>>>>> df = df.join(df_t.select("ID"),["ID"])? And then compare with the
>>>>>> explain plan of df1 after the statement df1 = 
>>>>>> df.join(df_t.select("ID"),["ID
>>>>>> "])?
>>>>>>
>>>>>> Its late here, but I am yet to go through this completely.  But I
>>>>>> think that SPARK does throw a warning mentioning us to use Row instead of
>>>>>> Dictionary.
>>>>>>
>>>>>> It will be of help if you could kindly try using the below statement
>>>>>> and go through your used case once again (I am yet to go through all the
>>>>>> lines):
>>>>>>
>>>>>>
>>>>>>
>>>>>> from pyspark.sql import Row
>>>>>>
>>>>>> df = spark.createDataFrame([Row(score =
>>>>>> 1.0,ID="abc",LABEL=True,k=2), Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Spark Users,
>>>>>>>     The following code snippet has an "attribute missing" error
>>>>>>> while the attribute exists.  This bug is  triggered by a particular
>>>>>>> sequence of of "select", "groupby" and "join".  Note that if I take away
>>>>>>> the "select"  in #line B,  the code runs without error.   However, the
>>>>>>> "select" in #line B  includes all columns in the dataframe and hence
>>>>>>> should  not affect the final result.
>>>>>>>
>>>>>>>
>>>>>>> import pyspark.sql.functions as F
>>>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>>>
>>>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>>>
>>>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>>>> ilter(F.col("nL")>1)
>>>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>>>> "cnt1")
>>>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to