grundprinzip opened a new pull request, #46002:
URL: https://github.com/apache/spark/pull/46002

   ### What changes were proposed in this pull request?
   
   This patch adds support to register custom dispatch handlers when 
serializing objects using the provided Cloudpickle library. This is necessary 
to provide compatibility when executing ForEachBatch functions in structured 
streaming.
   
   A typical example for this behavior is the following test case:
   
   ```python
   def curried_function(df):
       def inner(batch_df, batch_id):
           df.createOrReplaceTempView("updates")
           batch_df.createOrReplaceTempView("batch_updates")
   
       return inner
   
   df = 
spark.readStream.format("text").load("python/test_support/sql/streaming")
   other_df = self.spark.range(100)
   df.writeStream.foreachBatch(curried_function(other_df)).start()
   ```
   Here we curry a DataFrame into the function called during ForEachBatch and 
effectively passing state. Until now, serializing DataFrames and SparkSessions 
in Spark Connect was not possible since the SparkSession carries the open GPRC 
connection and the DataFrame itself overrides certain magic methods that make 
pickling fail.
   
   To make serializing Spark Sessions possible, we register a custom session 
constructor, that simply returns the current active session, during the 
serialization of the ForEachBatch function. Now, when the ForEachBatch worker 
starts the execution it already creates and registers an active SparkSession. 
To serialize and reconstruct the DataFrame we simply have to pass in the 
session and the plan, the remaining attributes do not carry a permanent state.
   
   
   ### Why are the changes needed?
   Compatibility and Ease of Use
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added and updated tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to