hm. you are getting below AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
The problem seems to be that you are using the append output mode when writing the streaming query results to Kafka. This mode is designed for scenarios where you want to append new data to an existing dataset at the sink (in this case, the "sink" topic in Kafka). However, your query involves a streaming aggregation: group by provinceId, window('createTime', '1 hour', '30 minutes'). The problem is that Spark Structured Streaming requires a watermark to ensure exactly-once processing when using aggregations with append mode. Your code already defines a watermark on the "createTime" column with a delay of 10 seconds (withWatermark("createTime", "10 seconds")). However, the error message indicates it is missing on the start column. Try adding watermark to "start" Column: Modify your code as below to include a watermark on the "start" column generated by the window function: from pyspark.sql.functions import col, from_json, explode, window, sum, watermark streaming_df = session.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "payment_msg") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \ .select("parsed_value.*") \ .withWatermark("createTime", "10 seconds") # Existing watermark on createTime *# Modified section with watermark on 'start' column* streaming_df = streaming_df.groupBy( col("provinceId"), window(col("createTime"), "1 hour", "30 minutes") ).agg( sum(col("payAmount")).alias("totalPayAmount") ).withWatermark(expr("start"), "10 seconds") # Watermark on window-generated 'start' # Rest of the code remains the same streaming_df.createOrReplaceTempView("streaming_df") spark.sql(""" SELECT window.start, window.end, provinceId, totalPayAmount FROM streaming_df ORDER BY window.start """) \ .writeStream \ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() Try and see how it goes HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid> wrote: > Hi Mich, > > Thank you so much for your response. I really appreciate your help! > > You mentioned "defining the watermark using the withWatermark function on > the streaming_df before creating the temporary view” - I believe this is > what I’m doing and it’s not working for me. Here is the exact code snippet > that I’m running: > > ``` > >>> streaming_df = session.readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", "localhost:9092")\ > .option("subscribe", "payment_msg")\ > .option("startingOffsets","earliest")\ > .load()\ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value"))\ > .select("parsed_value.*")\ > .withWatermark("createTime", "10 seconds") > > >>> streaming_df.createOrReplaceTempView("streaming_df”) > > >>> spark.sql(""" > SELECT > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > FROM streaming_df > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > ORDER BY window.start > """)\ > .withWatermark("start", "10 seconds")\ > .writeStream\ > .format("kafka") \ > .option("checkpointLocation", "checkpoint") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("topic", "sink") \ > .start() > > AnalysisException: Append output mode not supported when there are > streaming aggregations on streaming DataFrames/DataSets without watermark; > EventTimeWatermark start#37: timestamp, 10 seconds > ``` > > I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks > again! > > Best, > Chloe > > > On 2024/04/02 20:32:11 Mich Talebzadeh wrote: > > ok let us take it for a test. > > > > The original code of mine > > > > def fetch_data(self): > > self.sc.setLogLevel("ERROR") > > schema = StructType() \ > > .add("rowkey", StringType()) \ > > .add("timestamp", TimestampType()) \ > > .add("temperature", IntegerType()) > > checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" > > try: > > > > # construct a streaming dataframe 'streamingDataFrame' that > > subscribes to topic temperature > > streamingDataFrame = self.spark \ > > .readStream \ > > .format("kafka") \ > > .option("kafka.bootstrap.servers", > > config['MDVariables']['bootstrapServers'],) \ > > .option("schema.registry.url", > > config['MDVariables']['schemaRegistryURL']) \ > > .option("group.id", config['common']['appName']) \ > > .option("zookeeper.connection.timeout.ms", > > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > > .option("rebalance.backoff.ms", > > config['MDVariables']['rebalanceBackoffMS']) \ > > .option("zookeeper.session.timeout.ms", > > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > > .option("auto.commit.interval.ms", > > config['MDVariables']['autoCommitIntervalMS']) \ > > .option("subscribe", "temperature") \ > > .option("failOnDataLoss", "false") \ > > .option("includeHeaders", "true") \ > > .option("startingOffsets", "earliest") \ > > .load() \ > > .select(from_json(col("value").cast("string"), > > schema).alias("parsed_value")) > > > > > > resultC = streamingDataFrame.select( \ > > col("parsed_value.rowkey").alias("rowkey") \ > > , col("parsed_value.timestamp").alias("timestamp") \ > > , > col("parsed_value.temperature").alias("temperature")) > > > > """ > > We work out the window and the AVG(temperature) in the > window's > > timeframe below > > This should return back the following Dataframe as struct > > > > root > > |-- window: struct (nullable = false) > > | |-- start: timestamp (nullable = true) > > | |-- end: timestamp (nullable = true) > > |-- avg(temperature): double (nullable = true) > > > > """ > > resultM = resultC. \ > > withWatermark("timestamp", "5 minutes"). \ > > groupBy(window(resultC.timestamp, "5 minutes", "5 > > minutes")). \ > > avg('temperature') > > > > # We take the above DataFrame and flatten it to get the > columns > > aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature" > > resultMF = resultM. \ > > select( \ > > > F.col("window.start").alias("startOfWindow") \ > > , F.col("window.end").alias("endOfWindow") \ > > , > > F.col("avg(temperature)").alias("AVGTemperature")) > > > > # Kafka producer requires a key, value pair. We generate > UUID > > key as the unique identifier of Kafka record > > uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType()) > > > > """ > > We take DataFrame resultMF containing temperature info and > > write it to Kafka. The uuid is serialized as a string and used as the > key. > > We take all the columns of the DataFrame and serialize them > as > > a JSON string, putting the results in the "value" of the record. > > """ > > result = resultMF.withColumn("uuid",uuidUdf()) \ > > .selectExpr("CAST(uuid AS STRING) AS key", > > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \ > > .writeStream \ > > .outputMode('complete') \ > > .format("kafka") \ > > .option("kafka.bootstrap.servers", > > config['MDVariables']['bootstrapServers'],) \ > > .option("topic", "avgtemperature") \ > > .option('checkpointLocation', checkpoint_path) \ > > .queryName("avgtemperature") \ > > .start() > > > > except Exception as e: > > print(f"""{e}, quitting""") > > sys.exit(1) > > > > #print(result.status) > > #print(result.recentProgress) > > #print(result.lastProgress) > > > > result.awaitTermination() > > > > Now try to use sql for the entire transformation and aggression > > > > #import this and anything else needed > > from pyspark.sql.functions import from_json, col, window > > from pyspark.sql.types import StructType, StringType,IntegerType, > > FloatType, TimestampType > > > > > > # Define the schema for the JSON data > > schema = ... # Replace with your schema definition > > > > # construct a streaming dataframe 'streamingDataFrame' that > > subscribes to topic temperature > > streamingDataFrame = self.spark \ > > .readStream \ > > .format("kafka") \ > > .option("kafka.bootstrap.servers", > > config['MDVariables']['bootstrapServers'],) \ > > .option("schema.registry.url", > > config['MDVariables']['schemaRegistryURL']) \ > > .option("group.id", config['common']['appName']) \ > > .option("zookeeper.connection.timeout.ms", > > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > > .option("rebalance.backoff.ms", > > config['MDVariables']['rebalanceBackoffMS']) \ > > .option("zookeeper.session.timeout.ms", > > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > > .option("auto.commit.interval.ms", > > config['MDVariables']['autoCommitIntervalMS']) \ > > .option("subscribe", "temperature") \ > > .option("failOnDataLoss", "false") \ > > .option("includeHeaders", "true") \ > > .option("startingOffsets", "earliest") \ > > .load() \ > > .select(from_json(col("value").cast("string"), > > schema).alias("parsed_value")) > > .select("parsed_value.*") > > .withWatermark("createTime", "10 seconds")) # Define the > > watermark here > > > > # Create a temporary view from the streaming DataFrame with watermark > > streaming_df.createOrReplaceTempView("michboy") > > > > # Execute SQL queries on the temporary view > > result_df = (spark.sql(""" > > SELECT > > window.start, window.end, provinceId, sum(payAmount) as > > totalPayAmount > > FROM michboy > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > > ORDER BY window.start > > """) > > .writeStream > > .format("kafka") > > .option("checkpointLocation", "checkpoint") > > .option("kafka.bootstrap.servers", "localhost:9092") > > .option("topic", "sink") > > .start()) > > > > Note that the watermark is defined using the withWatermark function on > the > > streaming_df before creating the temporary view (michboy 😀). This way, > the > > watermark information is correctly propagated to the temporary view, > > allowing you to execute SQL queries with window functions and > aggregations > > on the streaming data. > > > > Note that by defining the watermark on the streaming DataFrame before > > creating the temporary view, Spark will recognize the watermark and allow > > streaming aggregations and window operations in your SQL queries. > > > > HTH > > > > Mich Talebzadeh, > > Technologist | Solutions Architect | Data Engineer | Generative AI > > London > > United Kingdom > > > > > > view my Linkedin profile > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > > > > > *Disclaimer:* The information provided is correct to the best of my > > knowledge but of course cannot be guaranteed . It is essential to note > > that, as with any advice, quote "one test result is worth one-thousand > > expert opinions (Werner < > https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > > > > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> > wrote: > > > > > Hello! > > > > > > I am attempting to write a streaming pipeline that would consume data > from > > > a Kafka source, manipulate the data, and then write results to a > downstream > > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of > using > > > the function API that Spark offers. I read a few guides on how to do > this > > > and my understanding is that I need to create a temp view in order to > > > execute my raw SQL queries via spark.sql(). > > > > > > However, I’m having trouble defining watermarks on my source. It > doesn’t > > > seem like there is a way to introduce watermark in the raw SQL that > Spark > > > supports, so I’m using the .withWatermark() function. However, this > > > watermark does not work on the temp view. > > > > > > Example code: > > > ``` > > > streaming_df.select(from_json(col("value").cast("string"), > > > > schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", > > > "10 seconds”) > > > > > > json_df.createOrReplaceTempView("json_df”) > > > > > > session.sql(""" > > > SELECT > > > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > > > FROM json_df > > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > > > ORDER BY window.start > > > """)\ > > > .writeStream\ > > > .format("kafka") \ > > > .option("checkpointLocation", "checkpoint") \ > > > .option("kafka.bootstrap.servers", "localhost:9092") \ > > > .option("topic", "sink") \ > > > .start() > > > ``` > > > This throws > > > ``` > > > AnalysisException: Append output mode not supported when there are > > > streaming aggregations on streaming DataFrames/DataSets without > watermark; > > > ``` > > > > > > If I switch out the SQL query and write it in the function API instead, > > > everything seems to work fine. > > > > > > How can I use .sql() in conjunction with watermarks? > > > > > > Best, > > > Chloe > > > > > >