Variable name binding is a python thing, and Spark should not care how the
variable is named. What matters is the dependency graph. Spark fails to
handle this dependency graph correctly for which I am quite surprised: this
is just a simple combination of three very common sql operations.


On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Shiyuan,
>
> I do not know whether I am right, but I would prefer to avoid expressions
> in Spark as:
>
> df = <<some transformation on df>>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Here is the pretty print of the physical plan which reveals some details
>> about what causes the bug (see the lines highlighted in bold):
>> WithColumnRenamed() fails to update the dependency graph correctly:
>>
>>
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used
>>
>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>    :  +- Join Inner, (ID#64 = ID#99)
>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>    :     +- Project [ID#99]
>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
>> nL#90L]
>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>> score#102]
>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>> count#118L]
>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>             +- Join Inner, (ID#135 = ID#99)
>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>> score#138]*
>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>                +- Project [ID#99]
>>                   +- Filter (nL#90L > cast(1 as bigint))
>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>> LABEL#100) AS nL#90L]
>>                         +- *!Project [ID#99, score#102, LABEL#100,
>> kk#128L]*
>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>> kk#73L, score#102]*
>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>
>> Here is the code which generates the error:
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenam
>> ed("k","kk").select("ID","score","LABEL","kk")
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>>
>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>
>>> The spark warning about Row instead of Dict is not the culprit. The
>>> problem still persists after I use Row instead of Dict to generate the
>>> dataframe.
>>>
>>> Here is the expain() output regarding the reassignment of df as Gourav
>>> suggests to run, They look the same except that  the serial numbers
>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>
>>> this is the output of df.explain() after df =
>>> df.join(df_t.select("ID"),["ID"])
>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>> isnotnull(ID#7303) +- *(3) Scan 
>>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>
>>>
>>> In comparison, this is the output of df1.explain() after  df1 =
>>> df.join(df_t.select("ID"),["ID"])?
>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>> ,k#7376L,score#7377]
>>>
>>>
>>> Here is the code I run and the error I get in Spark 2.3.0. By looking at
>>> the error,  the cause seems to be that  spark doesn't look up the column by
>>> its name but by a serial number and  the serial number somehow is messed
>>> up.
>>>
>>> import pyspark.sql.functions as F
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>
>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>   #line B
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>> This is the error:
>>> 'Resolved attribute(s) kk#144L missing from
>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>> the operation: kk. Please check if the right attribute(s) are
>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n :
>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>>
>>>
>>>
>>>
>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> what I am curious about is the reassignment of df.
>>>>
>>>> Can you please look into the explain plan of df after the statement df
>>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>>>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID
>>>> "])?
>>>>
>>>> Its late here, but I am yet to go through this completely.  But I think
>>>> that SPARK does throw a warning mentioning us to use Row instead of
>>>> Dictionary.
>>>>
>>>> It will be of help if you could kindly try using the below statement
>>>> and go through your used case once again (I am yet to go through all the
>>>> lines):
>>>>
>>>>
>>>>
>>>> from pyspark.sql import Row
>>>>
>>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>>
>>>>> Hi Spark Users,
>>>>>     The following code snippet has an "attribute missing" error while
>>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>>> in #line B,  the code runs without error.   However, the "select" in #line
>>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>>> final result.
>>>>>
>>>>>
>>>>> import pyspark.sql.functions as F
>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>
>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>
>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>> ilter(F.col("nL")>1)
>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>> "cnt1")
>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to