There is a paper from Databricks on this subject

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

But having tested it, there seems to be a bug there that I reported to
Databricks forum as well (in answer to a user question)

I have come to a conclusion that this is a bug. In general there is a bug
in obtaining individual values from the dictionary. For example, a bug in
the way Spark Streaming is populating the processed_rows_per_second key
within the microbatch_data -> microbatch_data = event.progres dictionary or
any other key. I have explored various debugging steps, and even though the
key seems to exist, the value might not be getting set. Note that the
dictionary itself prints the elements correctly. This is with regard to
method onQueryProgress(self, event) in class
MyListener(StreamingQueryListener):

For example with print(microbatch_data), you get all printed as below

onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
numPartitions=default",
"startOffset" : 20,
"endOffset" : 21,
"latestOffset" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701
} ],
"sink" : {
"description" :
"org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
"numOutputRows" : 1
}
}
However, the observed behaviour (i.e. processed_rows_per_second is either
None or not being updated correctly).

The spark version I used for my test is 3.4

Sample code uses format=rate for simulating a streaming process. You can
test the code yourself, all in one
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid

def process_data(df):

    processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
                      withColumn("doubled_value", col("value") * 2). \
                      withColumn("op_type", lit(1)). \
                      withColumn("op_time", current_timestamp())

    return processed_df

# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()

# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("onQueryStarted")
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress
        print("microbatch_data received")  # Check if data is received
        print(microbatch_data)
        processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
        if processed_rows_per_second is not None:  # Check if value exists
           print("processed_rows_per_second retrieved")
           print(f"Processed rows per second: {processed_rows_per_second}")
        else:
           print("processed_rows_per_second not retrieved!")
    def onQueryTerminated(self, event):
        print("onQueryTerminated")
        if event.exception:
            print(f"Query terminated with exception: {event.exception}")
        else:
            print("Query successfully terminated.")
    # Add my listener.

listener_instance = MyListener()
spark.streams.addListener(listener_instance)


# Create a streaming DataFrame with the rate source
streaming_df = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
)

# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)

# Define the output sink (for example, console sink)
query = (
    processed_streaming_df.select( \
                          col("key").alias("key") \
                        , col("doubled_value").alias("doubled_value") \
                        , col("op_type").alias("op_type") \
                        , col("op_time").alias("op_time")). \
                        writeStream.\
                        outputMode("append").\
                        format("console"). \
                        start()
)

# Wait for the streaming query to terminate
query.awaitTermination()

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".

Reply via email to