Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as
y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()

and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`'
given input columns: [x.value, x.offset, x.key, x.timestampType,
x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]

any idea whats wrong here?

Thanks!

Reply via email to