hm. you are getting below

AnalysisException: Append output mode not supported when there are
streaming aggregations on streaming DataFrames/DataSets without watermark;

The problem seems to be that you are using the append output mode when
writing the streaming query results to Kafka. This mode is designed for
scenarios where you want to append new data to an existing dataset at the
sink (in this case, the "sink" topic in Kafka). However, your query
involves a streaming aggregation: group by provinceId, window('createTime',
'1 hour', '30 minutes'). The problem is that Spark Structured Streaming
requires a watermark to ensure exactly-once processing when using
aggregations with append mode. Your code already defines a watermark on the
"createTime" column with a delay of 10 seconds (withWatermark("createTime",
"10 seconds")). However, the error message indicates it is missing on the
start column. Try adding watermark to "start" Column: Modify your code as
below  to include a watermark on the "start" column generated by the window
function:

from pyspark.sql.functions import col, from_json, explode, window, sum,
watermark

streaming_df = session.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "payment_msg") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
  .select("parsed_value.*") \
  .withWatermark("createTime", "10 seconds")  # Existing watermark on
createTime

*# Modified section with watermark on 'start' column*
streaming_df = streaming_df.groupBy(
  col("provinceId"),
  window(col("createTime"), "1 hour", "30 minutes")
).agg(
  sum(col("payAmount")).alias("totalPayAmount")
).withWatermark(expr("start"), "10 seconds")  # Watermark on
window-generated 'start'

# Rest of the code remains the same
streaming_df.createOrReplaceTempView("streaming_df")

spark.sql("""
SELECT
  window.start, window.end, provinceId, totalPayAmount
FROM streaming_df
ORDER BY window.start
""") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()

Try and see how it goes

HTH

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid> wrote:

> Hi Mich,
>
> Thank you so much for your response. I really appreciate your help!
>
> You mentioned "defining the watermark using the withWatermark function on
> the streaming_df before creating the temporary view” - I believe this is
> what I’m doing and it’s not working for me. Here is the exact code snippet
> that I’m running:
>
> ```
> >>> streaming_df = session.readStream\
>     .format("kafka")\
>     .option("kafka.bootstrap.servers", "localhost:9092")\
>     .option("subscribe", "payment_msg")\
>     .option("startingOffsets","earliest")\
>     .load()\
>     .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))\
>     .select("parsed_value.*")\
>     .withWatermark("createTime", "10 seconds")
>
> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>
> >>> spark.sql("""
> SELECT
>     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>     FROM streaming_df
>     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>     ORDER BY window.start
> """)\
>   .withWatermark("start", "10 seconds")\
>   .writeStream\
>   .format("kafka") \
>   .option("checkpointLocation", "checkpoint") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("topic", "sink") \
>   .start()
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
> EventTimeWatermark start#37: timestamp, 10 seconds
> ```
>
> I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks
> again!
>
> Best,
> Chloe
>
>
> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> > ok let us take it for a test.
> >
> > The original code of mine
> >
> >     def fetch_data(self):
> >         self.sc.setLogLevel("ERROR")
> >         schema = StructType() \
> >          .add("rowkey", StringType()) \
> >          .add("timestamp", TimestampType()) \
> >          .add("temperature", IntegerType())
> >         checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
> >         try:
> >
> >             # construct a streaming dataframe 'streamingDataFrame' that
> > subscribes to topic temperature
> >             streamingDataFrame = self.spark \
> >                 .readStream \
> >                 .format("kafka") \
> >                 .option("kafka.bootstrap.servers",
> > config['MDVariables']['bootstrapServers'],) \
> >                 .option("schema.registry.url",
> > config['MDVariables']['schemaRegistryURL']) \
> >                 .option("group.id", config['common']['appName']) \
> >                 .option("zookeeper.connection.timeout.ms",
> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> >                 .option("rebalance.backoff.ms",
> > config['MDVariables']['rebalanceBackoffMS']) \
> >                 .option("zookeeper.session.timeout.ms",
> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
> >                 .option("auto.commit.interval.ms",
> > config['MDVariables']['autoCommitIntervalMS']) \
> >                 .option("subscribe", "temperature") \
> >                 .option("failOnDataLoss", "false") \
> >                 .option("includeHeaders", "true") \
> >                 .option("startingOffsets", "earliest") \
> >                 .load() \
> >                 .select(from_json(col("value").cast("string"),
> > schema).alias("parsed_value"))
> >
> >
> >             resultC = streamingDataFrame.select( \
> >                      col("parsed_value.rowkey").alias("rowkey") \
> >                    , col("parsed_value.timestamp").alias("timestamp") \
> >                    ,
> col("parsed_value.temperature").alias("temperature"))
> >
> >             """
> >             We work out the window and the AVG(temperature) in the
> window's
> > timeframe below
> >             This should return back the following Dataframe as struct
> >
> >              root
> >              |-- window: struct (nullable = false)
> >              |    |-- start: timestamp (nullable = true)
> >              |    |-- end: timestamp (nullable = true)
> >              |-- avg(temperature): double (nullable = true)
> >
> >             """
> >             resultM = resultC. \
> >                      withWatermark("timestamp", "5 minutes"). \
> >                      groupBy(window(resultC.timestamp, "5 minutes", "5
> > minutes")). \
> >                      avg('temperature')
> >
> >             # We take the above DataFrame and flatten it to get the
> columns
> > aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
> >             resultMF = resultM. \
> >                        select( \
> >
> F.col("window.start").alias("startOfWindow") \
> >                           , F.col("window.end").alias("endOfWindow") \
> >                           ,
> > F.col("avg(temperature)").alias("AVGTemperature"))
> >
> >             # Kafka producer requires a key, value pair. We generate
> UUID
> > key as the unique identifier of Kafka record
> >             uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
> >
> >             """
> >             We take DataFrame resultMF containing temperature info and
> > write it to Kafka. The uuid is serialized as a string and used as the
> key.
> >             We take all the columns of the DataFrame and serialize them
> as
> > a JSON string, putting the results in the "value" of the record.
> >             """
> >             result = resultMF.withColumn("uuid",uuidUdf()) \
> >                      .selectExpr("CAST(uuid AS STRING) AS key",
> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
> >                      .writeStream \
> >                      .outputMode('complete') \
> >                      .format("kafka") \
> >                      .option("kafka.bootstrap.servers",
> > config['MDVariables']['bootstrapServers'],) \
> >                      .option("topic", "avgtemperature") \
> >                      .option('checkpointLocation', checkpoint_path) \
> >                      .queryName("avgtemperature") \
> >                      .start()
> >
> >         except Exception as e:
> >                 print(f"""{e}, quitting""")
> >                 sys.exit(1)
> >
> >         #print(result.status)
> >         #print(result.recentProgress)
> >         #print(result.lastProgress)
> >
> >         result.awaitTermination()
> >
> > Now try to use sql for the entire transformation and aggression
> >
> > #import this and anything else needed
> > from pyspark.sql.functions import from_json, col, window
> > from pyspark.sql.types import StructType, StringType,IntegerType,
> > FloatType, TimestampType
> >
> >
> > # Define the schema for the JSON data
> > schema = ... # Replace with your schema definition
> >
> >         # construct a streaming dataframe 'streamingDataFrame' that
> > subscribes to topic temperature
> >         streamingDataFrame = self.spark \
> >                 .readStream \
> >                 .format("kafka") \
> >                 .option("kafka.bootstrap.servers",
> > config['MDVariables']['bootstrapServers'],) \
> >                 .option("schema.registry.url",
> > config['MDVariables']['schemaRegistryURL']) \
> >                 .option("group.id", config['common']['appName']) \
> >                 .option("zookeeper.connection.timeout.ms",
> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> >                 .option("rebalance.backoff.ms",
> > config['MDVariables']['rebalanceBackoffMS']) \
> >                 .option("zookeeper.session.timeout.ms",
> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
> >                 .option("auto.commit.interval.ms",
> > config['MDVariables']['autoCommitIntervalMS']) \
> >                 .option("subscribe", "temperature") \
> >                 .option("failOnDataLoss", "false") \
> >                 .option("includeHeaders", "true") \
> >                 .option("startingOffsets", "earliest") \
> >                 .load() \
> >                 .select(from_json(col("value").cast("string"),
> > schema).alias("parsed_value"))
> >               .select("parsed_value.*")
> >               .withWatermark("createTime", "10 seconds"))  # Define the
> > watermark here
> >
> > # Create a temporary view from the streaming DataFrame with watermark
> > streaming_df.createOrReplaceTempView("michboy")
> >
> > # Execute SQL queries on the temporary view
> > result_df = (spark.sql("""
> >     SELECT
> >         window.start, window.end, provinceId, sum(payAmount) as
> > totalPayAmount
> >     FROM michboy
> >     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> >     ORDER BY window.start
> > """)
> >               .writeStream
> >               .format("kafka")
> >               .option("checkpointLocation", "checkpoint")
> >               .option("kafka.bootstrap.servers", "localhost:9092")
> >               .option("topic", "sink")
> >               .start())
> >
> > Note that the watermark is defined using the withWatermark function on
> the
> > streaming_df before creating the temporary view (michboy 😀). This way,
> the
> > watermark information is correctly propagated to the temporary view,
> > allowing you to execute SQL queries with window functions and
> aggregations
> > on the streaming data.
> >
> > Note that by defining the watermark on the streaming DataFrame before
> > creating the temporary view, Spark will recognize the watermark and allow
> > streaming aggregations and window operations in your SQL queries.
> >
> > HTH
> >
> > Mich Talebzadeh,
> > Technologist | Solutions Architect | Data Engineer  | Generative AI
> > London
> > United Kingdom
> >
> >
> >    view my Linkedin profile
> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> >
> >
> >  https://en.everybodywiki.com/Mich_Talebzadeh
> >
> >
> >
> > *Disclaimer:* The information provided is correct to the best of my
> > knowledge but of course cannot be guaranteed . It is essential to note
> > that, as with any advice, quote "one test result is worth one-thousand
> > expert opinions (Werner  <
> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
> >
> >
> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid>
> wrote:
> >
> > > Hello!
> > >
> > > I am attempting to write a streaming pipeline that would consume data
> from
> > > a Kafka source, manipulate the data, and then write results to a
> downstream
> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of
> using
> > > the function API that Spark offers. I read a few guides on how to do
> this
> > > and my understanding is that I need to create a temp view in order to
> > > execute my raw SQL queries via spark.sql().
> > >
> > > However, I’m having trouble defining watermarks on my source. It
> doesn’t
> > > seem like there is a way to introduce watermark in the raw SQL that
> Spark
> > > supports, so I’m using the .withWatermark() function. However, this
> > > watermark does not work on the temp view.
> > >
> > > Example code:
> > > ```
> > > streaming_df.select(from_json(col("value").cast("string"),
> > >
> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
> > > "10 seconds”)
> > >
> > > json_df.createOrReplaceTempView("json_df”)
> > >
> > > session.sql("""
> > > SELECT
> > > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> > > FROM json_df
> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> > > ORDER BY window.start
> > > """)\
> > > .writeStream\
> > > .format("kafka") \
> > > .option("checkpointLocation", "checkpoint") \
> > > .option("kafka.bootstrap.servers", "localhost:9092") \
> > > .option("topic", "sink") \
> > > .start()
> > > ```
> > > This throws
> > > ```
> > > AnalysisException: Append output mode not supported when there are
> > > streaming aggregations on streaming DataFrames/DataSets without
> watermark;
> > > ```
> > >
> > > If I switch out the SQL query and write it in the function API instead,
> > > everything seems to work fine.
> > >
> > > How can I use .sql() in conjunction with watermarks?
> > >
> > > Best,
> > > Chloe
> > >
> >
>

Reply via email to