Variable name binding is a python thing, and Spark should not care how the variable is named. What matters is the dependency graph. Spark fails to handle this dependency graph correctly for which I am quite surprised: this is just a simple combination of three very common sql operations.
On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Shiyuan, > > I do not know whether I am right, but I would prefer to avoid expressions > in Spark as: > > df = <<some transformation on df>> > > > Regards, > Gourav Sengupta > > On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote: > >> Here is the pretty print of the physical plan which reveals some details >> about what causes the bug (see the lines highlighted in bold): >> WithColumnRenamed() fails to update the dependency graph correctly: >> >> >> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 >> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) >> with the same name appear in the operation: kk. Please check if the right >> attribute(s) are used >> >> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L] >> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L)) >> :- Project [ID#64, score#67, LABEL#65, kk#73L] >> : +- Join Inner, (ID#64 = ID#99) >> : :- Project [ID#64, score#67, LABEL#65, kk#73L] >> : : +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67] >> : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67] >> : +- Project [ID#99] >> : +- Filter (nL#90L > cast(1 as bigint)) >> : +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS >> nL#90L] >> : +- Project [ID#99, score#102, LABEL#100, kk#73L] >> : +- Project [ID#99, LABEL#100, k#101L AS kk#73L, >> score#102] >> : +- LogicalRDD [ID#99, LABEL#100, k#101L, >> score#102] >> +- Project [ID#135, kk#128L, count#118L AS cnt1#123L] >> +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS >> count#118L] >> +- Project [ID#135, score#138, LABEL#136, kk#128L] >> +- Join Inner, (ID#135 = ID#99) >> :- Project [ID#135, score#138, LABEL#136, kk#128L] >> : +- *Project [ID#135, LABEL#136, k#137L AS kk#128L, >> score#138]* >> : +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138] >> +- Project [ID#99] >> +- Filter (nL#90L > cast(1 as bigint)) >> +- Aggregate [ID#99], [ID#99, count(distinct >> LABEL#100) AS nL#90L] >> +- *!Project [ID#99, score#102, LABEL#100, >> kk#128L]* >> +-* Project [ID#99, LABEL#100, k#101L AS >> kk#73L, score#102]* >> +- LogicalRDD [ID#99, LABEL#100, k#101L, >> score#102] >> >> Here is the code which generates the error: >> >> import pyspark.sql.functions as F >> from pyspark.sql import Row >> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k= >> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenam >> ed("k","kk").select("ID","score","LABEL","kk") >> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")). >> filter(F.col("nL")>1) >> df = df.join(df_t.select("ID"),["ID"]) >> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >> "cnt1") >> df = df.join(df_sw, ["ID","kk"]) >> >> >> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote: >> >>> The spark warning about Row instead of Dict is not the culprit. The >>> problem still persists after I use Row instead of Dict to generate the >>> dataframe. >>> >>> Here is the expain() output regarding the reassignment of df as Gourav >>> suggests to run, They look the same except that the serial numbers >>> following the columns are different(eg. ID#7273 vs. ID#7344). >>> >>> this is the output of df.explain() after df = >>> df.join(df_t.select("ID"),["ID"]) >>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, >>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort >>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273, >>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS >>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan >>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort >>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) >>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], >>> functions=[finalmerge_count(distinct merge count#7314L) AS >>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- >>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct >>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, >>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, >>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], >>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter >>> isnotnull(ID#7303) +- *(3) Scan >>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] >>> >>> >>> In comparison, this is the output of df1.explain() after df1 = >>> df.join(df_t.select("ID"),["ID"])? >>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, >>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort >>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344, >>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS >>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan >>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort >>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) >>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], >>> functions=[finalmerge_count(distinct merge count#7385L) AS >>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- >>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct >>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, >>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, >>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], >>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter >>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375 >>> ,k#7376L,score#7377] >>> >>> >>> Here is the code I run and the error I get in Spark 2.3.0. By looking at >>> the error, the cause seems to be that spark doesn't look up the column by >>> its name but by a serial number and the serial number somehow is messed >>> up. >>> >>> import pyspark.sql.functions as F >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2 >>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]) >>> >>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") >>> #line B >>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>> ilter(F.col("nL")>1) >>> df = df.join(df_t.select("ID"),["ID"]) >>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>> "cnt1") >>> df = df.join(df_sw, ["ID","kk"]) >>> >>> This is the error: >>> 'Resolved attribute(s) kk#144L missing from >>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, >>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in >>> the operation: kk. Please check if the right attribute(s) are >>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join >>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88, >>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :- >>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88, >>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89, >>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L > >>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct >>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119, >>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n : >>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project >>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150, >>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150, >>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :- >>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150, >>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150, >>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter >>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, >>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, >>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, >>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n' >>> >>> >>> >>> >>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> what I am curious about is the reassignment of df. >>>> >>>> Can you please look into the explain plan of df after the statement df >>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the explain >>>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID >>>> "])? >>>> >>>> Its late here, but I am yet to go through this completely. But I think >>>> that SPARK does throw a warning mentioning us to use Row instead of >>>> Dictionary. >>>> >>>> It will be of help if you could kindly try using the below statement >>>> and go through your used case once again (I am yet to go through all the >>>> lines): >>>> >>>> >>>> >>>> from pyspark.sql import Row >>>> >>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2), >>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)]) >>>> >>>> Regards, >>>> Gourav Sengupta >>>> >>>> >>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: >>>> >>>>> Hi Spark Users, >>>>> The following code snippet has an "attribute missing" error while >>>>> the attribute exists. This bug is triggered by a particular sequence of >>>>> of "select", "groupby" and "join". Note that if I take away the "select" >>>>> in #line B, the code runs without error. However, the "select" in #line >>>>> B includes all columns in the dataframe and hence should not affect the >>>>> final result. >>>>> >>>>> >>>>> import pyspark.sql.functions as F >>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >>>>> >>>>> df = df.withColumnRenamed("k","kk")\ >>>>> .select("ID","score","LABEL","kk") #line B >>>>> >>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>>> ilter(F.col("nL")>1) >>>>> df = df.join(df_t.select("ID"),["ID"]) >>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>>> "cnt1") >>>>> df = df.join(df_sw, ["ID","kk"]) >>>>> >>>> >>>> >>> >> >