Hello! I am attempting to write a streaming pipeline that would consume data from a Kafka source, manipulate the data, and then write results to a downstream sink (Kafka, Redis, etc). I want to write fully formed SQL instead of using the function API that Spark offers. I read a few guides on how to do this and my understanding is that I need to create a temp view in order to execute my raw SQL queries via spark.sql().
However, I’m having trouble defining watermarks on my source. It doesn’t seem like there is a way to introduce watermark in the raw SQL that Spark supports, so I’m using the .withWatermark() function. However, this watermark does not work on the temp view. Example code: ``` streaming_df.select(from_json(col("value").cast("string"), schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", "10 seconds”) json_df.createOrReplaceTempView("json_df”) session.sql(""" SELECT window.start, window.end, provinceId, sum(payAmount) as totalPayAmount FROM json_df GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') ORDER BY window.start """)\ .writeStream\ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() ``` This throws ``` AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; ``` If I switch out the SQL query and write it in the function API instead, everything seems to work fine. How can I use .sql() in conjunction with watermarks? Best, Chloe