This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d72b0b8061 fix: preserve None projection semantics across FFI boundary
in ForeignTableProvider::scan (#20393)
d72b0b8061 is described below
commit d72b0b8061ead91f6c16248db3a5a93cd55d4e81
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Sat Mar 7 02:26:20 2026 +0800
fix: preserve None projection semantics across FFI boundary in
ForeignTableProvider::scan (#20393)
## Which issue does this PR close?
N/A (newly discovered bug)
This is originally found in apache/sedona-db when working on a custom
plan node:
https://github.com/apache/sedona-db/pull/611#discussion_r2814855320
## Rationale for this change
`ForeignTableProvider::scan()` converts a `None` projection (meaning
"return all columns") into an empty `RVec<usize>` before passing it
across the FFI boundary. On the receiving side, `scan_fn_wrapper` always
wraps the received `RVec` in `Some(...)`, passing `Some(&vec![])` to the
inner `TableProvider::scan()`. This means "project zero columns" — the
exact opposite of the intended "project all columns."
The root cause is that the `FFI_TableProvider::scan` function signature
uses `RVec<usize>` for the projections parameter. Since `RVec<usize>`
cannot represent `None`, the `None` vs. empty-vec distinction is lost at
the FFI boundary.
## What changes are included in this PR?
Three coordinated changes in `datafusion/ffi/src/table_provider.rs`:
1. **FFI struct definition**: Changed `scan` function pointer signature
from `RVec<usize>` to `ROption<RVec<usize>>` for the projections
parameter, matching how `limit` already uses `ROption<usize>` for the
same `None`-vs-value distinction.
2. **Receiver side** (`scan_fn_wrapper`): Converts
`ROption<RVec<usize>>` via `.into_option().map(...)` and passes
`projections.as_ref()` to the inner provider, preserving `None`
semantics.
3. **Sender side** (`ForeignTableProvider::scan`): Converts
`Option<&Vec<usize>>` to `ROption<RVec<usize>>` via `.into()` instead of
using `unwrap_or_default()`.
Plus a new unit test
`test_scan_with_none_projection_returns_all_columns` that directly
exercises the FFI round-trip with `projection=None` and verifies all 3
columns are returned.
Also fixed the existing `test_aggregation` test to set
`library_marker_id = mock_foreign_marker_id` so it actually exercises
the FFI path instead of taking the local bypass.
## How are these changes tested?
- New test `test_scan_with_none_projection_returns_all_columns`: creates
a 3-column MemTable, wraps it through FFI with the foreign marker set,
calls `scan(None)`, and asserts 3 columns are returned (previously
returned 0).
## Are these changes safe?
This is a **breaking FFI ABI change** to the `FFI_TableProvider::scan`
function pointer signature. However:
- The `abi_stable` crate's `#[derive(StableAbi)]` generates layout
checks at dylib load time, so mismatched dylibs will be caught at load
rather than causing silent corruption.
- All existing providers construct `FFI_TableProvider` via `::new()` or
`::new_with_ffi_codec()`, which internally wire up `scan_fn_wrapper` —
nobody constructs the `scan` function pointer manually.
---
datafusion/ffi/src/table_provider.rs | 77 ++++++++++++++++++++++++++++++++----
1 file changed, 69 insertions(+), 8 deletions(-)
diff --git a/datafusion/ffi/src/table_provider.rs
b/datafusion/ffi/src/table_provider.rs
index df8b648026..1559549e63 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -108,7 +108,7 @@ pub struct FFI_TableProvider {
scan: unsafe extern "C" fn(
provider: &Self,
session: FFI_SessionRef,
- projections: RVec<usize>,
+ projections: ROption<RVec<usize>>,
filters_serialized: RVec<u8>,
limit: ROption<usize>,
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
@@ -232,7 +232,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
unsafe extern "C" fn scan_fn_wrapper(
provider: &FFI_TableProvider,
session: FFI_SessionRef,
- projections: RVec<usize>,
+ projections: ROption<RVec<usize>>,
filters_serialized: RVec<u8>,
limit: ROption<usize>,
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
@@ -269,11 +269,12 @@ unsafe extern "C" fn scan_fn_wrapper(
}
};
- let projections: Vec<_> = projections.into_iter().collect();
+ let projections: Option<Vec<usize>> =
+ projections.into_option().map(|p| p.into_iter().collect());
let plan = rresult_return!(
internal_provider
- .scan(session, Some(&projections), &filters, limit.into())
+ .scan(session, projections.as_ref(), &filters, limit.into())
.await
);
@@ -461,8 +462,9 @@ impl TableProvider for ForeignTableProvider {
) -> Result<Arc<dyn ExecutionPlan>> {
let session = FFI_SessionRef::new(session, None,
self.0.logical_codec.clone());
- let projections: Option<RVec<usize>> =
- projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
+ let projections: ROption<RVec<usize>> = projection
+ .map(|p| p.iter().map(|v| v.to_owned()).collect())
+ .into();
let codec: Arc<dyn LogicalExtensionCodec> =
(&self.0.logical_codec).into();
let filter_list = LogicalExprList {
@@ -474,7 +476,7 @@ impl TableProvider for ForeignTableProvider {
let maybe_plan = (self.0.scan)(
&self.0,
session,
- projections.unwrap_or_default(),
+ projections,
filters_serialized,
limit.into(),
)
@@ -658,8 +660,9 @@ mod tests {
let provider = Arc::new(MemTable::try_new(schema,
vec![vec![batch1]])?);
- let ffi_provider =
+ let mut ffi_provider =
FFI_TableProvider::new(provider, true, None, task_ctx_provider,
None);
+ ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
let foreign_table_provider: Arc<dyn TableProvider> =
(&ffi_provider).into();
@@ -712,4 +715,62 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_scan_with_none_projection_returns_all_columns() ->
Result<()> {
+ use arrow::datatypes::Field;
+ use datafusion::arrow::array::Float32Array;
+ use datafusion::arrow::datatypes::DataType;
+ use datafusion::arrow::record_batch::RecordBatch;
+ use datafusion::datasource::MemTable;
+ use datafusion::physical_plan::collect;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Float32, false),
+ Field::new("b", DataType::Float32, false),
+ Field::new("c", DataType::Float32, false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Float32Array::from(vec![1.0, 2.0])),
+ Arc::new(Float32Array::from(vec![3.0, 4.0])),
+ Arc::new(Float32Array::from(vec![5.0, 6.0])),
+ ],
+ )?;
+
+ let provider =
+ Arc::new(MemTable::try_new(Arc::clone(&schema),
vec![vec![batch]])?);
+
+ let ctx = Arc::new(SessionContext::new());
+ let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn
TaskContextProvider>;
+ let task_ctx_provider =
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+ // Wrap in FFI and force the foreign path (not local bypass)
+ let mut ffi_provider =
+ FFI_TableProvider::new(provider, true, None, task_ctx_provider,
None);
+ ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
+
+ let foreign_table_provider: Arc<dyn TableProvider> =
(&ffi_provider).into();
+
+ // Call scan with projection=None, meaning "return all columns"
+ let plan = foreign_table_provider
+ .scan(&ctx.state(), None, &[], None)
+ .await?;
+ assert_eq!(
+ plan.schema().fields().len(),
+ 3,
+ "scan(projection=None) should return all columns; got {}",
+ plan.schema().fields().len()
+ );
+
+ // Also verify we can execute and get correct data
+ let batches = collect(plan, ctx.task_ctx()).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_columns(), 3);
+ assert_eq!(batches[0].num_rows(), 2);
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]