Hi,
Thank you for your advice
This is the amended code
def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received") # Check if data is received
#print(microbatch_data)
#processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
processed_rows_per_second =
microbatch_data.get("processedRowsPerSecond")
print("CPC", processed_rows_per_second)
if processed_rows_per_second is not None: # Check if value exists
print("ocessed_rows_per_second retrieved")
print(f"Processed rows per second: {processed_rows_per_second}")
else:
print("processed_rows_per_second not retrieved!")
This is the output
onQueryStarted
'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------+-------+-------+
|key|doubled_value|op_type|op_time|
+---+-------------+-------+-------+
+---+-------------+-------+-------+
onQueryProgress
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------+-------+--------------------+
| key|doubled_value|op_type| op_time|
+--------------------+-------------+-------+--------------------+
|a960f663-d13a-49c...| 0| 1|2024-03-11 12:17:...|
+--------------------+-------------+-------+--------------------+
onQueryProgress
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------+-------+--------------------+
| key|doubled_value|op_type| op_time|
+--------------------+-------------+-------+--------------------+
|a960f663-d13a-49c...| 2| 1|2024-03-11 12:17:...|
+--------------------+-------------+-------+--------------------+
I am afraid it is not working. Not even printing anything
Cheers
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote:
> *now -> not
>
> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:
>
>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>> Camel case now snake case
>>
>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:
>>
>>>
>>> There is a paper from Databricks on this subject
>>>
>>>
>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>
>>> But having tested it, there seems to be a bug there that I reported to
>>> Databricks forum as well (in answer to a user question)
>>>
>>> I have come to a conclusion that this is a bug. In general there is a
>>> bug in obtaining individual values from the dictionary. For example, a bug
>>> in the way Spark Streaming is populating the processed_rows_per_second key
>>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>>> any other key. I have explored various debugging steps, and even though the
>>> key seems to exist, the value might not be getting set. Note that the
>>> dictionary itself prints the elements correctly. This is with regard to
>>> method onQueryProgress(self, event) in class
>>> MyListener(StreamingQueryListener):
>>>
>>> For example with print(microbatch_data), you get all printed as below
>>>
>>> onQueryProgress
>>> microbatch_data received
>>> {
>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>> "name" : null,
>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>> "batchId" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701,
>>> "durationMs" : {
>>> "addBatch" : 37,
>>> "commitOffsets" : 41,
>>> "getBatch" : 0,
>>> "latestOffset" : 0,
>>> "queryPlanning" : 5,
>>> "triggerExecution" : 187,
>>> "walCommit" : 104
>>> },
>>> "stateOperators" : [ ],
>>> "sources" : [ {
>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>> numPartitions=default",
>>> "startOffset" : 20,
>>> "endOffset" : 21,
>>> "latestOffset" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701
>>> } ],
>>> "sink" : {
>>> "description" :
>>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>>> "numOutputRows" : 1
>>> }
>>> }
>>> However, the observed behaviour (i.e. processed_rows_per_second is
>>> either None or not being updated correctly).
>>>
>>> The spark version I used for my test is 3.4
>>>
>>> Sample code uses format=rate for simulating a streaming process. You can
>>> test the code yourself, all in one
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import col
>>> from pyspark.sql.streaming import DataStreamWriter,
>>> StreamingQueryListener
>>> from pyspark.sql.functions import col, round, current_timestamp, lit
>>> import uuid
>>>
>>> def process_data(df):
>>>
>>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>> withColumn("doubled_value", col("value") * 2). \
>>> withColumn("op_type", lit(1)). \
>>> withColumn("op_time", current_timestamp())
>>>
>>> return processed_df
>>>
>>> # Create a Spark session
>>> appName = "testListener"
>>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>>
>>> # Define the schema for the streaming data
>>> schema = "key string timestamp timestamp, value long"
>>>
>>> # Define my listener.
>>> class MyListener(StreamingQueryListener):
>>> def onQueryStarted(self, event):
>>> print("onQueryStarted")
>>> print(f"'{event.name}' [{event.id}] got started!")
>>> def onQueryProgress(self, event):
>>> print("onQueryProgress")
>>> # Access micro-batch data
>>> microbatch_data = event.progress
>>> print("microbatch_data received") # Check if data is received
>>> print(microbatch_data)
>>> processed_rows_per_second =
>>> microbatch_data.get("processed_rows_per_second")
>>> if processed_rows_per_second is not None: # Check if value
>>> exists
>>> print("processed_rows_per_second retrieved")
>>> print(f"Processed rows per second:
>>> {processed_rows_per_second}")
>>> else:
>>> print("processed_rows_per_second not retrieved!")
>>> def onQueryTerminated(self, event):
>>> print("onQueryTerminated")
>>> if event.exception:
>>> print(f"Query terminated with exception: {event.exception}")
>>> else:
>>> print("Query successfully terminated.")
>>> # Add my listener.
>>>
>>> listener_instance = MyListener()
>>> spark.streams.addListener(listener_instance)
>>>
>>>
>>> # Create a streaming DataFrame with the rate source
>>> streaming_df = (
>>> spark.readStream
>>> .format("rate")
>>> .option("rowsPerSecond", 1)
>>> .load()
>>> )
>>>
>>> # Apply processing function to the streaming DataFrame
>>> processed_streaming_df = process_data(streaming_df)
>>>
>>> # Define the output sink (for example, console sink)
>>> query = (
>>> processed_streaming_df.select( \
>>> col("key").alias("key") \
>>> , col("doubled_value").alias("doubled_value") \
>>> , col("op_type").alias("op_type") \
>>> , col("op_time").alias("op_time")). \
>>> writeStream.\
>>> outputMode("append").\
>>> format("console"). \
>>> start()
>>> )
>>>
>>> # Wait for the streaming query to terminate
>>> query.awaitTermination()
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>> view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>