This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d72b0b8061 fix: preserve None projection semantics across FFI boundary 
in ForeignTableProvider::scan (#20393)
d72b0b8061 is described below

commit d72b0b8061ead91f6c16248db3a5a93cd55d4e81
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Sat Mar 7 02:26:20 2026 +0800

    fix: preserve None projection semantics across FFI boundary in 
ForeignTableProvider::scan (#20393)
    
    ## Which issue does this PR close?
    
    N/A (newly discovered bug)
    
    This is originally found in apache/sedona-db when working on a custom
    plan node:
    https://github.com/apache/sedona-db/pull/611#discussion_r2814855320
    
    ## Rationale for this change
    
    `ForeignTableProvider::scan()` converts a `None` projection (meaning
    "return all columns") into an empty `RVec<usize>` before passing it
    across the FFI boundary. On the receiving side, `scan_fn_wrapper` always
    wraps the received `RVec` in `Some(...)`, passing `Some(&vec![])` to the
    inner `TableProvider::scan()`. This means "project zero columns" — the
    exact opposite of the intended "project all columns."
    
    The root cause is that the `FFI_TableProvider::scan` function signature
    uses `RVec<usize>` for the projections parameter. Since `RVec<usize>`
    cannot represent `None`, the `None` vs. empty-vec distinction is lost at
    the FFI boundary.
    
    ## What changes are included in this PR?
    
    Three coordinated changes in `datafusion/ffi/src/table_provider.rs`:
    
    1. **FFI struct definition**: Changed `scan` function pointer signature
    from `RVec<usize>` to `ROption<RVec<usize>>` for the projections
    parameter, matching how `limit` already uses `ROption<usize>` for the
    same `None`-vs-value distinction.
    
    2. **Receiver side** (`scan_fn_wrapper`): Converts
    `ROption<RVec<usize>>` via `.into_option().map(...)` and passes
    `projections.as_ref()` to the inner provider, preserving `None`
    semantics.
    
    3. **Sender side** (`ForeignTableProvider::scan`): Converts
    `Option<&Vec<usize>>` to `ROption<RVec<usize>>` via `.into()` instead of
    using `unwrap_or_default()`.
    
    Plus a new unit test
    `test_scan_with_none_projection_returns_all_columns` that directly
    exercises the FFI round-trip with `projection=None` and verifies all 3
    columns are returned.
    
    Also fixed the existing `test_aggregation` test to set
    `library_marker_id = mock_foreign_marker_id` so it actually exercises
    the FFI path instead of taking the local bypass.
    
    ## How are these changes tested?
    
    - New test `test_scan_with_none_projection_returns_all_columns`: creates
    a 3-column MemTable, wraps it through FFI with the foreign marker set,
    calls `scan(None)`, and asserts 3 columns are returned (previously
    returned 0).
    
    ## Are these changes safe?
    
    This is a **breaking FFI ABI change** to the `FFI_TableProvider::scan`
    function pointer signature. However:
    
    - The `abi_stable` crate's `#[derive(StableAbi)]` generates layout
    checks at dylib load time, so mismatched dylibs will be caught at load
    rather than causing silent corruption.
    - All existing providers construct `FFI_TableProvider` via `::new()` or
    `::new_with_ffi_codec()`, which internally wire up `scan_fn_wrapper` —
    nobody constructs the `scan` function pointer manually.
---
 datafusion/ffi/src/table_provider.rs | 77 ++++++++++++++++++++++++++++++++----
 1 file changed, 69 insertions(+), 8 deletions(-)

diff --git a/datafusion/ffi/src/table_provider.rs 
b/datafusion/ffi/src/table_provider.rs
index df8b648026..1559549e63 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -108,7 +108,7 @@ pub struct FFI_TableProvider {
     scan: unsafe extern "C" fn(
         provider: &Self,
         session: FFI_SessionRef,
-        projections: RVec<usize>,
+        projections: ROption<RVec<usize>>,
         filters_serialized: RVec<u8>,
         limit: ROption<usize>,
     ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
@@ -232,7 +232,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
 unsafe extern "C" fn scan_fn_wrapper(
     provider: &FFI_TableProvider,
     session: FFI_SessionRef,
-    projections: RVec<usize>,
+    projections: ROption<RVec<usize>>,
     filters_serialized: RVec<u8>,
     limit: ROption<usize>,
 ) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
@@ -269,11 +269,12 @@ unsafe extern "C" fn scan_fn_wrapper(
             }
         };
 
-        let projections: Vec<_> = projections.into_iter().collect();
+        let projections: Option<Vec<usize>> =
+            projections.into_option().map(|p| p.into_iter().collect());
 
         let plan = rresult_return!(
             internal_provider
-                .scan(session, Some(&projections), &filters, limit.into())
+                .scan(session, projections.as_ref(), &filters, limit.into())
                 .await
         );
 
@@ -461,8 +462,9 @@ impl TableProvider for ForeignTableProvider {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let session = FFI_SessionRef::new(session, None, 
self.0.logical_codec.clone());
 
-        let projections: Option<RVec<usize>> =
-            projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
+        let projections: ROption<RVec<usize>> = projection
+            .map(|p| p.iter().map(|v| v.to_owned()).collect())
+            .into();
 
         let codec: Arc<dyn LogicalExtensionCodec> = 
(&self.0.logical_codec).into();
         let filter_list = LogicalExprList {
@@ -474,7 +476,7 @@ impl TableProvider for ForeignTableProvider {
             let maybe_plan = (self.0.scan)(
                 &self.0,
                 session,
-                projections.unwrap_or_default(),
+                projections,
                 filters_serialized,
                 limit.into(),
             )
@@ -658,8 +660,9 @@ mod tests {
 
         let provider = Arc::new(MemTable::try_new(schema, 
vec![vec![batch1]])?);
 
-        let ffi_provider =
+        let mut ffi_provider =
             FFI_TableProvider::new(provider, true, None, task_ctx_provider, 
None);
+        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
 
         let foreign_table_provider: Arc<dyn TableProvider> = 
(&ffi_provider).into();
 
@@ -712,4 +715,62 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_scan_with_none_projection_returns_all_columns() -> 
Result<()> {
+        use arrow::datatypes::Field;
+        use datafusion::arrow::array::Float32Array;
+        use datafusion::arrow::datatypes::DataType;
+        use datafusion::arrow::record_batch::RecordBatch;
+        use datafusion::datasource::MemTable;
+        use datafusion::physical_plan::collect;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Float32, false),
+            Field::new("b", DataType::Float32, false),
+            Field::new("c", DataType::Float32, false),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Float32Array::from(vec![1.0, 2.0])),
+                Arc::new(Float32Array::from(vec![3.0, 4.0])),
+                Arc::new(Float32Array::from(vec![5.0, 6.0])),
+            ],
+        )?;
+
+        let provider =
+            Arc::new(MemTable::try_new(Arc::clone(&schema), 
vec![vec![batch]])?);
+
+        let ctx = Arc::new(SessionContext::new());
+        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn 
TaskContextProvider>;
+        let task_ctx_provider = 
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+        // Wrap in FFI and force the foreign path (not local bypass)
+        let mut ffi_provider =
+            FFI_TableProvider::new(provider, true, None, task_ctx_provider, 
None);
+        ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
+
+        let foreign_table_provider: Arc<dyn TableProvider> = 
(&ffi_provider).into();
+
+        // Call scan with projection=None, meaning "return all columns"
+        let plan = foreign_table_provider
+            .scan(&ctx.state(), None, &[], None)
+            .await?;
+        assert_eq!(
+            plan.schema().fields().len(),
+            3,
+            "scan(projection=None) should return all columns; got {}",
+            plan.schema().fields().len()
+        );
+
+        // Also verify we can execute and get correct data
+        let batches = collect(plan, ctx.task_ctx()).await?;
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_columns(), 3);
+        assert_eq!(batches[0].num_rows(), 2);
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to