Hi TD, I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
git clone https://github.com/apache/spark.gitcd spark git fetch git checkout branch-2.3 git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959 export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" ./build/mvn -DskipTests compile ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Hey, > > Thanks for testing out stream-stream joins and reporting this issue. I am > going to take a look at this. > > TD > > > > On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote: > >> if I change it to the below code it works. However, I don't believe it is >> the solution I am looking for. I want to be able to do it in raw SQL and >> moreover, If a user gives a big chained raw spark SQL join query I am not >> even sure how to make copies of the dataframe to achieve the self-join. Is >> there any other way here? >> >> >> >> import org.apache.spark.sql.streaming.Trigger >> >> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", >> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", >> "earliest").load(); >> val jdf1 = >> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", >> "earliest").load(); >> >> jdf.createOrReplaceTempView("table") >> jdf1.createOrReplaceTempView("table") >> >> val resultdf = spark.sql("select * from table inner join table1 on >> table.offset=table1.offset") >> >> resultdf.writeStream.outputMode("append").format("console").option("truncate", >> false).trigger(Trigger.ProcessingTime(1000)).start() >> >> >> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> If I change it to this >>> >>> >>> >>> >>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I have the following code >>>> >>>> import org.apache.spark.sql.streaming.Trigger >>>> >>>> val jdf = >>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>> "localhost:9092").option("subscribe", >>>> "join_test").option("startingOffsets", "earliest").load(); >>>> >>>> jdf.createOrReplaceTempView("table") >>>> >>>> val resultdf = spark.sql("select * from table as x inner join table as y >>>> on x.offset=y.offset") >>>> >>>> resultdf.writeStream.outputMode("update").format("console").option("truncate", >>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>> >>>> and I get the following exception. >>>> >>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given >>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, >>>> x.timestamp, x.partition]; line 1 pos 50; >>>> 'Project [*] >>>> +- 'Join Inner, ('x.offset = 'y.offset) >>>> :- SubqueryAlias x >>>> : +- SubqueryAlias table >>>> : +- StreamingRelation >>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>>> offset#32L, timestamp#33, timestampType#34] >>>> +- SubqueryAlias y >>>> +- SubqueryAlias table >>>> +- StreamingRelation >>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>>> offset#32L, timestamp#33, timestampType#34] >>>> >>>> any idea whats wrong here? >>>> >>>> Thanks! >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >> >