Hello!

I am attempting to write a streaming pipeline that would consume data from a 
Kafka source, manipulate the data, and then write results to a downstream sink 
(Kafka, Redis, etc). I want to write fully formed SQL instead of using the 
function API that Spark offers. I read a few guides on how to do this and my 
understanding is that I need to create a temp view in order to execute my raw 
SQL queries via spark.sql(). 

However, I’m having trouble defining watermarks on my source. It doesn’t seem 
like there is a way to introduce watermark in the raw SQL that Spark supports, 
so I’m using the .withWatermark() function. However, this watermark does not 
work on the temp view.

Example code:
```
streaming_df.select(from_json(col("value").cast("string"), 
schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
 "10 seconds”)

json_df.createOrReplaceTempView("json_df”)

session.sql("""
SELECT
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM json_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
""")\
.writeStream\
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()
```
This throws
```
AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
```

If I switch out the SQL query and write it in the function API instead, 
everything seems to work fine.

How can I use .sql() in conjunction with watermarks?

Best,
Chloe

Reply via email to