Here it is : https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2991198123660769/823198936734135/866038034322120/latest.html
On Wed, Apr 11, 2018 at 10:55 AM, Alessandro Solimando < alessandro.solima...@gmail.com> wrote: > Hi Shiyuan, > can you show us the output of ¨explain¨ over df (as a last step)? > > On 11 April 2018 at 19:47, Shiyuan <gshy2...@gmail.com> wrote: > >> Variable name binding is a python thing, and Spark should not care how >> the variable is named. What matters is the dependency graph. Spark fails to >> handle this dependency graph correctly for which I am quite surprised: this >> is just a simple combination of three very common sql operations. >> >> >> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi Shiyuan, >>> >>> I do not know whether I am right, but I would prefer to avoid >>> expressions in Spark as: >>> >>> df = <<some transformation on df>> >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote: >>> >>>> Here is the pretty print of the physical plan which reveals some >>>> details about what causes the bug (see the lines highlighted in bold): >>>> WithColumnRenamed() fails to update the dependency graph correctly: >>>> >>>> >>>> 'Resolved attribute(s) kk#144L missing from >>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, >>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in >>>> the operation: kk. Please check if the right attribute(s) are used >>>> >>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L] >>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L)) >>>> :- Project [ID#64, score#67, LABEL#65, kk#73L] >>>> : +- Join Inner, (ID#64 = ID#99) >>>> : :- Project [ID#64, score#67, LABEL#65, kk#73L] >>>> : : +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67] >>>> : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67] >>>> : +- Project [ID#99] >>>> : +- Filter (nL#90L > cast(1 as bigint)) >>>> : +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) >>>> AS nL#90L] >>>> : +- Project [ID#99, score#102, LABEL#100, kk#73L] >>>> : +- Project [ID#99, LABEL#100, k#101L AS kk#73L, >>>> score#102] >>>> : +- LogicalRDD [ID#99, LABEL#100, k#101L, >>>> score#102] >>>> +- Project [ID#135, kk#128L, count#118L AS cnt1#123L] >>>> +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS >>>> count#118L] >>>> +- Project [ID#135, score#138, LABEL#136, kk#128L] >>>> +- Join Inner, (ID#135 = ID#99) >>>> :- Project [ID#135, score#138, LABEL#136, kk#128L] >>>> : +- *Project [ID#135, LABEL#136, k#137L AS kk#128L, >>>> score#138]* >>>> : +- LogicalRDD [ID#135, LABEL#136, k#137L, >>>> score#138] >>>> +- Project [ID#99] >>>> +- Filter (nL#90L > cast(1 as bigint)) >>>> +- Aggregate [ID#99], [ID#99, count(distinct >>>> LABEL#100) AS nL#90L] >>>> +- *!Project [ID#99, score#102, LABEL#100, >>>> kk#128L]* >>>> +-* Project [ID#99, LABEL#100, k#101L AS >>>> kk#73L, score#102]* >>>> +- LogicalRDD [ID#99, LABEL#100, k#101L, >>>> score#102] >>>> >>>> Here is the code which generates the error: >>>> >>>> import pyspark.sql.functions as F >>>> from pyspark.sql import Row >>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2 >>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename >>>> d("k","kk").select("ID","score","LABEL","kk") >>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>> ilter(F.col("nL")>1) >>>> df = df.join(df_t.select("ID"),["ID"]) >>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>> "cnt1") >>>> df = df.join(df_sw, ["ID","kk"]) >>>> >>>> >>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote: >>>> >>>>> The spark warning about Row instead of Dict is not the culprit. The >>>>> problem still persists after I use Row instead of Dict to generate the >>>>> dataframe. >>>>> >>>>> Here is the expain() output regarding the reassignment of df as Gourav >>>>> suggests to run, They look the same except that the serial numbers >>>>> following the columns are different(eg. ID#7273 vs. ID#7344). >>>>> >>>>> this is the output of df.explain() after df = >>>>> df.join(df_t.select("ID"),["ID"]) >>>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, >>>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort >>>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange >>>>> hashpartitioning(ID#7273, >>>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS >>>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan >>>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort >>>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) >>>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], >>>>> functions=[finalmerge_count(distinct merge count#7314L) AS >>>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- >>>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct >>>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, >>>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, >>>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], >>>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter >>>>> isnotnull(ID#7303) +- *(3) Scan >>>>> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] >>>>> >>>>> >>>>> In comparison, this is the output of df1.explain() after df1 = >>>>> df.join(df_t.select("ID"),["ID"])? >>>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, >>>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort >>>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange >>>>> hashpartitioning(ID#7344, >>>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS >>>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan >>>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort >>>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) >>>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], >>>>> functions=[finalmerge_count(distinct merge count#7385L) AS >>>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- >>>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct >>>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, >>>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, >>>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], >>>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter >>>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375 >>>>> ,k#7376L,score#7377] >>>>> >>>>> >>>>> Here is the code I run and the error I get in Spark 2.3.0. By looking >>>>> at the error, the cause seems to be that spark doesn't look up the >>>>> column >>>>> by its name but by a serial number and the serial number somehow is >>>>> messed >>>>> up. >>>>> >>>>> import pyspark.sql.functions as F >>>>> from pyspark.sql import Row >>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2 >>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]) >>>>> >>>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") >>>>> #line B >>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>>> ilter(F.col("nL")>1) >>>>> df = df.join(df_t.select("ID"),["ID"]) >>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>>> "cnt1") >>>>> df = df.join(df_sw, ["ID","kk"]) >>>>> >>>>> This is the error: >>>>> 'Resolved attribute(s) kk#144L missing from >>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, >>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in >>>>> the operation: kk. Please check if the right attribute(s) are >>>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join >>>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88, >>>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :- >>>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88, >>>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89, >>>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L > >>>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct >>>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119, >>>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n >>>>> : >>>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project >>>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150, >>>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150, >>>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :- >>>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150, >>>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150, >>>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter >>>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, >>>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, >>>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, >>>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], >>>>> false\n' >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta < >>>>> gourav.sengu...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> what I am curious about is the reassignment of df. >>>>>> >>>>>> Can you please look into the explain plan of df after the statement >>>>>> df = df.join(df_t.select("ID"),["ID"])? And then compare with the >>>>>> explain plan of df1 after the statement df1 = >>>>>> df.join(df_t.select("ID"),["ID >>>>>> "])? >>>>>> >>>>>> Its late here, but I am yet to go through this completely. But I >>>>>> think that SPARK does throw a warning mentioning us to use Row instead of >>>>>> Dictionary. >>>>>> >>>>>> It will be of help if you could kindly try using the below statement >>>>>> and go through your used case once again (I am yet to go through all the >>>>>> lines): >>>>>> >>>>>> >>>>>> >>>>>> from pyspark.sql import Row >>>>>> >>>>>> df = spark.createDataFrame([Row(score = >>>>>> 1.0,ID="abc",LABEL=True,k=2), Row(score = 1.0,ID="abc",LABEL=True,k=3)]) >>>>>> >>>>>> Regards, >>>>>> Gourav Sengupta >>>>>> >>>>>> >>>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: >>>>>> >>>>>>> Hi Spark Users, >>>>>>> The following code snippet has an "attribute missing" error >>>>>>> while the attribute exists. This bug is triggered by a particular >>>>>>> sequence of of "select", "groupby" and "join". Note that if I take away >>>>>>> the "select" in #line B, the code runs without error. However, the >>>>>>> "select" in #line B includes all columns in the dataframe and hence >>>>>>> should not affect the final result. >>>>>>> >>>>>>> >>>>>>> import pyspark.sql.functions as F >>>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >>>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >>>>>>> >>>>>>> df = df.withColumnRenamed("k","kk")\ >>>>>>> .select("ID","score","LABEL","kk") #line B >>>>>>> >>>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>>>>> ilter(F.col("nL")>1) >>>>>>> df = df.join(df_t.select("ID"),["ID"]) >>>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>>>>> "cnt1") >>>>>>> df = df.join(df_sw, ["ID","kk"]) >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >