Thanks for the clarification. That makes sense.. In the code below, we can
see
def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received") # Check if data is received
#print(microbatch_data)
print(f"Type of microbatch_data is {type(microbatch_data)}")
#processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond") incorrect
processedRowsPerSecond = microbatch_data.processedRowsPerSecond
if processedRowsPerSecond is not None: # Check if value exists
print("processedRowsPerSecond retrieved")
print(f"Processed rows per second is ->
{processedRowsPerSecond}")
else:
print("processedRowsPerSecond not retrieved!")
The output
onQueryProgress
Type of microbatch_data is <class
'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599
So we are dealing with the attribute of the class and NOT the dictionary.
The line (processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond")) fails because it uses the
.get() method, while the second line (processedRowsPerSecond =
microbatch_data.processedRowsPerSecond) accesses the attribute directly.
In short, they need to ensure that that event.progress* returns a
dictionary *
Cheers
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Tue, 12 Mar 2024 at 04:04, 刘唯 <[email protected]> wrote:
> Oh I see why the confusion.
>
> microbatch_data = event.progress
>
> means that microbatch_data is a StreamingQueryProgress instance, it's not
> a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
> instead of the `get` method which is used for dictionaries.
>
> But weirdly, for query.lastProgress and query.recentProgress, they should
> return StreamingQueryProgress but instead they returned a dict. So the
> `get` method works there.
>
> I think PySpark should improve on this part.
>
> Mich Talebzadeh <[email protected]> 于2024年3月11日周一 05:51写道:
>
>> Hi,
>>
>> Thank you for your advice
>>
>> This is the amended code
>>
>> def onQueryProgress(self, event):
>> print("onQueryProgress")
>> # Access micro-batch data
>> microbatch_data = event.progress
>> #print("microbatch_data received") # Check if data is received
>> #print(microbatch_data)
>> #processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>> processed_rows_per_second =
>> microbatch_data.get("processedRowsPerSecond")
>> print("CPC", processed_rows_per_second)
>> if processed_rows_per_second is not None: # Check if value exists
>> print("ocessed_rows_per_second retrieved")
>> print(f"Processed rows per second:
>> {processed_rows_per_second}")
>> else:
>> print("processed_rows_per_second not retrieved!")
>>
>> This is the output
>>
>> onQueryStarted
>> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
>> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
>> SLF4J: Defaulting to no-operation MDCAdapter implementation.
>> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
>> further details.
>> -------------------------------------------
>> Batch: 0
>> -------------------------------------------
>> +---+-------------+-------+-------+
>> |key|doubled_value|op_type|op_time|
>> +---+-------------+-------+-------+
>> +---+-------------+-------+-------+
>>
>> onQueryProgress
>> -------------------------------------------
>> Batch: 1
>> -------------------------------------------
>> +--------------------+-------------+-------+--------------------+
>> | key|doubled_value|op_type| op_time|
>> +--------------------+-------------+-------+--------------------+
>> |a960f663-d13a-49c...| 0| 1|2024-03-11 12:17:...|
>> +--------------------+-------------+-------+--------------------+
>>
>> onQueryProgress
>> -------------------------------------------
>> Batch: 2
>> -------------------------------------------
>> +--------------------+-------------+-------+--------------------+
>> | key|doubled_value|op_type| op_time|
>> +--------------------+-------------+-------+--------------------+
>> |a960f663-d13a-49c...| 2| 1|2024-03-11 12:17:...|
>> +--------------------+-------------+-------+--------------------+
>>
>> I am afraid it is not working. Not even printing anything
>>
>> Cheers
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>> view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>> https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my kno
>> wledge but of course cannot be guaranteed . It is essential to note that,
>> as with any advice, quote "one test result is worth one-thousand expert
>> opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Mon, 11 Mar 2024 at 05:07, 刘唯 <[email protected]> wrote:
>>
>>> *now -> not
>>>
>>> 刘唯 <[email protected]> 于2024年3月10日周日 22:04写道:
>>>
>>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>>> Camel case now snake case
>>>>
>>>> Mich Talebzadeh <[email protected]> 于2024年3月10日周日 11:46写道:
>>>>
>>>>>
>>>>> There is a paper from Databricks on this subject
>>>>>
>>>>>
>>>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>>>
>>>>> But having tested it, there seems to be a bug there that I reported to
>>>>> Databricks forum as well (in answer to a user question)
>>>>>
>>>>> I have come to a conclusion that this is a bug. In general there is a
>>>>> bug in obtaining individual values from the dictionary. For example, a
>>>>> bug in the way Spark Streaming is populating the processe
>>>>> d_rows_per_second key within the microbatch_data -> microbatch_data =
>>>>> event.progres dictionary or any other key. I have explored various deb
>>>>> ugging steps, and even though the key seems to exist, the value might
>>>>> not be getting set. Note that the dictionary itself prints the el
>>>>> ements correctly. This is with regard to method onQueryProgress(self,
>>>>> event) in class MyListener(StreamingQueryListener):
>>>>>
>>>>> For example with print(microbatch_data), you get all printed as below
>>>>>
>>>>> onQueryProgress
>>>>> microbatch_data received
>>>>> {
>>>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>>>> "name" : null,
>>>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>>>> "batchId" : 21,
>>>>> "numInputRows" : 1,
>>>>> "inputRowsPerSecond" : 100.0,
>>>>> "processedRowsPerSecond" : 5.347593582887701,
>>>>> "durationMs" : {
>>>>> "addBatch" : 37,
>>>>> "commitOffsets" : 41,
>>>>> "getBatch" : 0,
>>>>> "latestOffset" : 0,
>>>>> "queryPlanning" : 5,
>>>>> "triggerExecution" : 187,
>>>>> "walCommit" : 104
>>>>> },
>>>>> "stateOperators" : [ ],
>>>>> "sources" : [ {
>>>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>>>> numPartitions=default",
>>>>> "startOffset" : 20,
>>>>> "endOffset" : 21,
>>>>> "latestOffset" : 21,
>>>>> "numInputRows" : 1,
>>>>> "inputRowsPerSecond" : 100.0,
>>>>> "processedRowsPerSecond" : 5.347593582887701
>>>>> } ],
>>>>> "sink" : {
>>>>> "description" : "org.apache.spark.sql.execution.streaming.ConsoleT
>>>>> able$@430a977c",
>>>>> "numOutputRows" : 1
>>>>> }
>>>>> }
>>>>> However, the observed behaviour (i.e. processed_rows_per_second is ei
>>>>> ther None or not being updated correctly).
>>>>>
>>>>> The spark version I used for my test is 3.4
>>>>>
>>>>> Sample code uses format=rate for simulating a streaming process. You c
>>>>> an test the code yourself, all in one
>>>>> from pyspark.sql import SparkSession
>>>>> from pyspark.sql.functions import col
>>>>> from pyspark.sql.streaming import DataStreamWriter,
>>>>> StreamingQueryListener
>>>>> from pyspark.sql.functions import col, round, current_timestamp, lit
>>>>> import uuid
>>>>>
>>>>> def process_data(df):
>>>>>
>>>>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>>>> withColumn("doubled_value", col("value") * 2). \
>>>>> withColumn("op_type", lit(1)). \
>>>>> withColumn("op_time", current_timestamp())
>>>>>
>>>>> return processed_df
>>>>>
>>>>> # Create a Spark session
>>>>> appName = "testListener"
>>>>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>>>>
>>>>> # Define the schema for the streaming data
>>>>> schema = "key string timestamp timestamp, value long"
>>>>>
>>>>> # Define my listener.
>>>>> class MyListener(StreamingQueryListener):
>>>>> def onQueryStarted(self, event):
>>>>> print("onQueryStarted")
>>>>> print(f"'{event.name}' [{event.id}] got started!")
>>>>> def onQueryProgress(self, event):
>>>>> print("onQueryProgress")
>>>>> # Access micro-batch data
>>>>> microbatch_data = event.progress
>>>>> print("microbatch_data received") # Check if data is received
>>>>> print(microbatch_data)
>>>>> processed_rows_per_second =
>>>>> microbatch_data.get("processed_rows_per_second")
>>>>> if processed_rows_per_second is not None: # Check if value
>>>>> exists
>>>>> print("processed_rows_per_second retrieved")
>>>>> print(f"Processed rows per second:
>>>>> {processed_rows_per_second}")
>>>>> else:
>>>>> print("processed_rows_per_second not retrieved!")
>>>>> def onQueryTerminated(self, event):
>>>>> print("onQueryTerminated")
>>>>> if event.exception:
>>>>> print(f"Query terminated with exception:
>>>>> {event.exception}")
>>>>> else:
>>>>> print("Query successfully terminated.")
>>>>> # Add my listener.
>>>>>
>>>>> listener_instance = MyListener()
>>>>> spark.streams.addListener(listener_instance)
>>>>>
>>>>>
>>>>> # Create a streaming DataFrame with the rate source
>>>>> streaming_df = (
>>>>> spark.readStream
>>>>> .format("rate")
>>>>> .option("rowsPerSecond", 1)
>>>>> .load()
>>>>> )
>>>>>
>>>>> # Apply processing function to the streaming DataFrame
>>>>> processed_streaming_df = process_data(streaming_df)
>>>>>
>>>>> # Define the output sink (for example, console sink)
>>>>> query = (
>>>>> processed_streaming_df.select( \
>>>>> col("key").alias("key") \
>>>>> , col("doubled_value").alias("doubled_value") \
>>>>> , col("op_type").alias("op_type") \
>>>>> , col("op_time").alias("op_time")). \
>>>>> writeStream.\
>>>>> outputMode("append").\
>>>>> format("console"). \
>>>>> start()
>>>>> )
>>>>>
>>>>> # Wait for the streaming query to terminate
>>>>> query.awaitTermination()
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>> view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>