adriangb opened a new pull request, #19267:
URL: https://github.com/apache/datafusion/pull/19267
Refactor PhysicalExtensionCodec with &dyn dispatch (Alternative to #19234)
Which issue does this PR close?
Closes #18477
Related to #19234 - this PR provides an alternative implementation
approach.
Rationale for this change
DataFusion's protobuf serialization system currently has limited
interception points. The existing PhysicalExtensionCodec
trait only provides hooks for custom extensions (try_decode/try_encode)
and unknown expressions
(try_decode_expr/try_encode_expr), but users cannot intercept
serialization of all plan and expression nodes.
This limitation prevents important use cases:
- Caching: Reusing previously deserialized expressions to avoid redundant
parsing
- Transformation: Modifying nodes during serialization/deserialization
- Metadata injection: Preserving custom state not captured in the protobuf
schema
- Decorator patterns: Wrapping standard serialization with custom
pre/post-processing
PR #19234 addresses this by switching from &dyn PhysicalExtensionCodec to
generics (&C where C: PhysicalExtensionCodec +
?Sized). However,
https://github.com/apache/datafusion/pull/19234#issuecomment-2869946159 raised
concerns about FFI
compatibility, since distributed systems and FFI boundaries often require
dynamic dispatch.
What changes are included in this PR?
This PR takes an alternative approach: keep &dyn dispatch but add 4 new
required methods that intercept every
plan/expression node during serialization:
```rust
pub trait PhysicalExtensionCodec: Debug + Send + Sync {
// ... existing methods unchanged ...
fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx:
&TaskContext)
-> Result<Arc<dyn ExecutionPlan>>;
fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
-> Result<PhysicalPlanNode>;
fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx:
&TaskContext, input_schema: &Schema)
-> Result<Arc<dyn PhysicalExpr>>;
fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
-> Result<PhysicalExprNode>;
}
```
Key implementation details:
- All recursive calls in from_proto.rs and to_proto.rs now go through the
codec methods
- Public default_* helper functions allow implementations to delegate to
standard behavior
- No default implementations - users must explicitly implement all methods
(with documented examples)
Comparison with PR #19234
| Aspect | PR #19234
| This PR
|
|-------------------------|-----------------------------------------------------|-------------------------------------------
----------------|
| Dispatch | Generic &C where C: PhysicalExtensionCodec +
?Sized | Keep &dyn PhysicalExtensionCodec
|
| Default implementations | Yes - methods have defaults calling helpers
| No - all methods required
|
| FFI compatibility | Requires concrete types at boundaries
| Fully compatible with dynamic dispatch
|
| Migration effort | Minimal (defaults handle it)
| Explicit (must add 4 methods, but with
simple delegation) |
| Monomorphization | Yes - separate code paths per codec type
| No - single code path via vtable
|
| Compile time impact | Potentially increased
| None
|
Pros of this approach:
- Maintains FFI compatibility for distributed systems using dynamic codec
resolution
- No monomorphization overhead - single code path for all codec
implementations
- Explicit is better than implicit - users acknowledge the new API surface
- Preserves existing &dyn patterns used throughout the codebase
Cons of this approach:
- Breaking change requires updating all PhysicalExtensionCodec
implementations
- Users must add 4 method implementations (though simple delegation to
default_* helpers works)
Files changed
- datafusion/proto/src/physical_plan/mod.rs - Added 4 new trait methods +
helper functions
- datafusion/proto/src/physical_plan/from_proto.rs - Recursive calls go
through codec
- datafusion/proto/src/physical_plan/to_proto.rs - Recursive calls go
through codec
- datafusion/ffi/src/proto/physical_extension_codec.rs - Updated FFI codec
implementations
- datafusion-examples/examples/proto/composed_extension_codec.rs - Updated
example codecs
- datafusion/proto/tests/cases/roundtrip_physical_plan.rs - Updated test
codecs
- docs/source/library-user-guide/upgrading.md - Added migration guide
Migration example
For existing implementations that don't need custom interception:
```rust
use datafusion_proto::physical_plan::{
default_deserialize_physical_plan, default_serialize_physical_plan,
default_deserialize_physical_expr, default_serialize_physical_expr,
};
impl PhysicalExtensionCodec for MyCodec {
// ... existing methods ...
fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx:
&TaskContext)
-> Result<Arc<dyn ExecutionPlan>> {
default_deserialize_physical_plan(proto, ctx, self)
}
fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
-> Result<PhysicalPlanNode> {
default_serialize_physical_plan(plan, self)
}
fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx:
&TaskContext, input_schema: &Schema)
-> Result<Arc<dyn PhysicalExpr>> {
default_deserialize_physical_expr(proto, ctx, input_schema, self)
}
fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
-> Result<PhysicalExprNode> {
default_serialize_physical_expr(expr, self)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]