adriangb opened a new pull request, #19267:
URL: https://github.com/apache/datafusion/pull/19267

   Refactor PhysicalExtensionCodec with &dyn dispatch (Alternative to #19234)
   
     Which issue does this PR close?
   
     Closes #18477
   
     Related to #19234 - this PR provides an alternative implementation 
approach.
   
     Rationale for this change
   
     DataFusion's protobuf serialization system currently has limited 
interception points. The existing PhysicalExtensionCodec
     trait only provides hooks for custom extensions (try_decode/try_encode) 
and unknown expressions
     (try_decode_expr/try_encode_expr), but users cannot intercept 
serialization of all plan and expression nodes.
   
     This limitation prevents important use cases:
     - Caching: Reusing previously deserialized expressions to avoid redundant 
parsing
     - Transformation: Modifying nodes during serialization/deserialization
     - Metadata injection: Preserving custom state not captured in the protobuf 
schema
     - Decorator patterns: Wrapping standard serialization with custom 
pre/post-processing
   
     PR #19234 addresses this by switching from &dyn PhysicalExtensionCodec to 
generics (&C where C: PhysicalExtensionCodec + 
     ?Sized). However, 
https://github.com/apache/datafusion/pull/19234#issuecomment-2869946159 raised 
concerns about FFI 
     compatibility, since distributed systems and FFI boundaries often require 
dynamic dispatch.
   
     What changes are included in this PR?
   
     This PR takes an alternative approach: keep &dyn dispatch but add 4 new 
required methods that intercept every
     plan/expression node during serialization:
   
   ```rust
     pub trait PhysicalExtensionCodec: Debug + Send + Sync {
         // ... existing methods unchanged ...
   
         fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: 
&TaskContext)
             -> Result<Arc<dyn ExecutionPlan>>;
   
         fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
             -> Result<PhysicalPlanNode>;
   
         fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: 
&TaskContext, input_schema: &Schema)
             -> Result<Arc<dyn PhysicalExpr>>;
   
         fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
             -> Result<PhysicalExprNode>;
     }
   ```
   
     Key implementation details:
     - All recursive calls in from_proto.rs and to_proto.rs now go through the 
codec methods
     - Public default_* helper functions allow implementations to delegate to 
standard behavior
     - No default implementations - users must explicitly implement all methods 
(with documented examples)
   
     Comparison with PR #19234
   
     | Aspect                  | PR #19234                                      
     | This PR
                     |
     
|-------------------------|-----------------------------------------------------|-------------------------------------------
     ----------------|
     | Dispatch                | Generic &C where C: PhysicalExtensionCodec + 
?Sized | Keep &dyn PhysicalExtensionCodec
                     |
     | Default implementations | Yes - methods have defaults calling helpers    
     | No - all methods required
                     |
     | FFI compatibility       | Requires concrete types at boundaries          
     | Fully compatible with dynamic dispatch
                     |
     | Migration effort        | Minimal (defaults handle it)                   
     | Explicit (must add 4 methods, but with
     simple delegation) |
     | Monomorphization        | Yes - separate code paths per codec type       
     | No - single code path via vtable
                     |
     | Compile time impact     | Potentially increased                          
     | None
                     |
   
     Pros of this approach:
     - Maintains FFI compatibility for distributed systems using dynamic codec 
resolution
     - No monomorphization overhead - single code path for all codec 
implementations
     - Explicit is better than implicit - users acknowledge the new API surface
     - Preserves existing &dyn patterns used throughout the codebase
   
     Cons of this approach:
     - Breaking change requires updating all PhysicalExtensionCodec 
implementations
     - Users must add 4 method implementations (though simple delegation to 
default_* helpers works)
   
     Files changed
   
     - datafusion/proto/src/physical_plan/mod.rs - Added 4 new trait methods + 
helper functions
     - datafusion/proto/src/physical_plan/from_proto.rs - Recursive calls go 
through codec
     - datafusion/proto/src/physical_plan/to_proto.rs - Recursive calls go 
through codec
     - datafusion/ffi/src/proto/physical_extension_codec.rs - Updated FFI codec 
implementations
     - datafusion-examples/examples/proto/composed_extension_codec.rs - Updated 
example codecs
     - datafusion/proto/tests/cases/roundtrip_physical_plan.rs - Updated test 
codecs
     - docs/source/library-user-guide/upgrading.md - Added migration guide
   
     Migration example
   
     For existing implementations that don't need custom interception:
   
   ```rust
     use datafusion_proto::physical_plan::{
         default_deserialize_physical_plan, default_serialize_physical_plan,
         default_deserialize_physical_expr, default_serialize_physical_expr,
     };
   
     impl PhysicalExtensionCodec for MyCodec {
         // ... existing methods ...
   
         fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: 
&TaskContext)
             -> Result<Arc<dyn ExecutionPlan>> {
             default_deserialize_physical_plan(proto, ctx, self)
         }
   
         fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
             -> Result<PhysicalPlanNode> {
             default_serialize_physical_plan(plan, self)
         }
   
         fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: 
&TaskContext, input_schema: &Schema)
             -> Result<Arc<dyn PhysicalExpr>> {
             default_deserialize_physical_expr(proto, ctx, input_schema, self)
         }
   
         fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
             -> Result<PhysicalExprNode> {
             default_serialize_physical_expr(expr, self)
         }
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to