I'm working around it like this: val testMapped2 = test1.rdd.map(t => t.copy(id = t.id + 1)).toDF.as[Test] testMapped2.as("t1").joinWith(testMapped2.as("t2"), $"t1.id" === $"t2.id ").show
Switching from RDD, then mapping, then going back to DS seemed to avoid the issue. On Fri, May 27, 2016 at 3:10 PM Koert Kuipers <ko...@tresata.com> wrote: > i am glad to see this, i think we can into this as well (in > 2.0.0-SNAPSHOT) but i couldn't reproduce it nicely. > > my observation was that joins of 2 datasets that were derived from the > same datasource gave this kind of trouble. i changed my datasource from val > to def (so it got created twice) as a workaround. the error did not occur > with datasets created in unit test with sc.parallelize. > > On Fri, May 27, 2016 at 1:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> I tried master branch : >> >> scala> val testMapped = test.map(t => t.copy(id = t.id + 1)) >> testMapped: org.apache.spark.sql.Dataset[Test] = [id: int] >> >> scala> testMapped.as("t1").joinWith(testMapped.as("t2"), $"t1.id" === $" >> t2.id").show >> org.apache.spark.sql.AnalysisException: cannot resolve '`t1.id`' given >> input columns: [id]; >> at >> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62) >> at >> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:59) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287) >> at >> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:68) >> >> >> Suggest logging a JIRA if there is none logged. >> >> On Fri, May 27, 2016 at 10:19 AM, Tim Gautier <tim.gaut...@gmail.com> >> wrote: >> >>> Oops, screwed up my example. This is what it should be: >>> >>> case class Test(id: Int) >>> val test = Seq( >>> Test(1), >>> Test(2), >>> Test(3) >>> ).toDS >>> test.as("t1").joinWith(test.as("t2"), $"t1.id" === $"t2.id").show >>> val testMapped = test.map(t => t.copy(id = t.id + 1)) >>> testMapped.as("t1").joinWith(testMapped.as("t2"), $"t1.id" === $" >>> t2.id").show >>> >>> >>> On Fri, May 27, 2016 at 11:16 AM Tim Gautier <tim.gaut...@gmail.com> >>> wrote: >>> >>>> I figured it out the trigger. Turns out it wasn't because I loaded it >>>> from the database, it was because the first thing I do after loading is to >>>> lower case all the strings. After a Dataset has been mapped, the resulting >>>> Dataset can't be self joined. Here's a test case that illustrates the >>>> issue: >>>> >>>> case class Test(id: Int) >>>> val test = Seq( >>>> Test(1), >>>> Test(2), >>>> Test(3) >>>> ).toDS >>>> test.as("t1").joinWith(test.as("t2"), $"t1.id" === $"t2.id").show >>>> // <-- works fine >>>> val testMapped = test.map(_.id + 1) // add 1 to each >>>> testMapped.as("t1").joinWith(testMapped.as("t2"), $"t1.id" === $" >>>> t2.id").show // <-- error >>>> >>>> >>>> On Fri, May 27, 2016 at 10:44 AM Tim Gautier <tim.gaut...@gmail.com> >>>> wrote: >>>> >>>>> I stand corrected. I just created a test table with a single int field >>>>> to test with and the Dataset loaded from that works with no issues. I'll >>>>> see if I can track down exactly what the difference might be. >>>>> >>>>> On Fri, May 27, 2016 at 10:29 AM Tim Gautier <tim.gaut...@gmail.com> >>>>> wrote: >>>>> >>>>>> I'm using 1.6.1. >>>>>> >>>>>> I'm not sure what good fake data would do since it doesn't seem to >>>>>> have anything to do with the data itself. It has to do with how the >>>>>> Dataset >>>>>> was created. Both datasets have exactly the same data in them, but the >>>>>> one >>>>>> created from a sql query fails where the one created from a Seq works. >>>>>> The >>>>>> case class is just a few Option[Int] and Option[String] fields, nothing >>>>>> special. >>>>>> >>>>>> Obviously there's some sort of difference between the two datasets, >>>>>> but Spark tells me they're exactly the same type with exactly the same >>>>>> data, so I couldn't create a test case without actually accessing a sql >>>>>> database. >>>>>> >>>>>> On Fri, May 27, 2016 at 10:15 AM Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Which release of Spark are you using ? >>>>>>> >>>>>>> Is it possible to come up with fake data that shows what you >>>>>>> described ? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Fri, May 27, 2016 at 8:24 AM, Tim Gautier <tim.gaut...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Unfortunately I can't show exactly the data I'm using, but this is >>>>>>>> what I'm seeing: >>>>>>>> >>>>>>>> I have a case class 'Product' that represents a table in our >>>>>>>> database. I load that data via >>>>>>>> sqlContext.read.format("jdbc").options(...). >>>>>>>> load.as[Product] and register it in a temp table 'product'. >>>>>>>> >>>>>>>> For testing, I created a new Dataset that has only 3 records in it: >>>>>>>> >>>>>>>> val ts = sqlContext.sql("select * from product where >>>>>>>> product_catalog_id in (1, 2, 3)").as[Product] >>>>>>>> >>>>>>>> I also created another one using the same case class and data, but >>>>>>>> from a sequence instead. >>>>>>>> >>>>>>>> val ds: Dataset[Product] = Seq( >>>>>>>> Product(Some(1), ...), >>>>>>>> Product(Some(2), ...), >>>>>>>> Product(Some(3), ...) >>>>>>>> ).toDS >>>>>>>> >>>>>>>> The spark shell tells me these are exactly the same type at this >>>>>>>> point, but they don't behave the same. >>>>>>>> >>>>>>>> ts.as("ts1").joinWith(ts.as("ts2"), $"ts1.product_catalog_id" === >>>>>>>> $"ts2.product_catalog_id") >>>>>>>> ds.as("ds1").joinWith(ds.as("ds2"), $"ds1.product_catalog_id" === >>>>>>>> $"ds2.product_catalog_id") >>>>>>>> >>>>>>>> Again, spark tells me these self joins return exactly the same >>>>>>>> type, but when I do a .show on them, only the one created from a Seq >>>>>>>> works. >>>>>>>> The one created by reading from the database throws this error: >>>>>>>> >>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve >>>>>>>> 'ts1.product_catalog_id' given input columns: [..., product_catalog_id, >>>>>>>> ...]; >>>>>>>> >>>>>>>> Is this a bug? Is there anyway to make the Dataset loaded from a >>>>>>>> table behave like the one created from a sequence? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Tim >>>>>>>> >>>>>>> >>>>>>> >> >