grundprinzip opened a new pull request, #46002:
URL: https://github.com/apache/spark/pull/46002
### What changes were proposed in this pull request?
This patch adds support to register custom dispatch handlers when
serializing objects using the provided Cloudpickle library. This is necessary
to provide compatibility when executing ForEachBatch functions in structured
streaming.
A typical example for this behavior is the following test case:
```python
def curried_function(df):
def inner(batch_df, batch_id):
df.createOrReplaceTempView("updates")
batch_df.createOrReplaceTempView("batch_updates")
return inner
df =
spark.readStream.format("text").load("python/test_support/sql/streaming")
other_df = self.spark.range(100)
df.writeStream.foreachBatch(curried_function(other_df)).start()
```
Here we curry a DataFrame into the function called during ForEachBatch and
effectively passing state. Until now, serializing DataFrames and SparkSessions
in Spark Connect was not possible since the SparkSession carries the open GPRC
connection and the DataFrame itself overrides certain magic methods that make
pickling fail.
To make serializing Spark Sessions possible, we register a custom session
constructor, that simply returns the current active session, during the
serialization of the ForEachBatch function. Now, when the ForEachBatch worker
starts the execution it already creates and registers an active SparkSession.
To serialize and reconstruct the DataFrame we simply have to pass in the
session and the plan, the remaining attributes do not carry a permanent state.
### Why are the changes needed?
Compatibility and Ease of Use
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added and updated tests
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]