This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d63ced04a Implement FFI table provider factory (#20326)
0d63ced04a is described below

commit 0d63ced04a6d961b1e33016a5d604172314d4aae
Author: Paul J. Davis <[email protected]>
AuthorDate: Sat Feb 21 06:36:30 2026 -0600

    Implement FFI table provider factory (#20326)
    
    > ## Which issue does this PR close?
    > * Closes [expose TableProviderFactory via
    FFI #17942](https://github.com/apache/datafusion/issues/17942)
    >
    
    This PR is re-opening PR #17994 and updating it to match the current FFI
    approach (I.e., I made it look like the FFI_TableProvider in various
    places).
    
    > ## Rationale for this change
    > Expose `TableProviderFactory` via FFI to enable external languages
    (e.g., Python) to implement custom table provider factories and extend
    DataFusion with new data source types.
    >
    > ## What changes are included in this PR?
    > * Added `datafusion/ffi/src/table_provider_factory.rs` with:
    >
    > * `FFI_TableProviderFactory`: Stable C ABI struct with function
    pointers for `create`, `clone`, `release`, and `version`
    > * `ForeignTableProviderFactory`: Wrapper implementing
    `TableProviderFactory` trait
    >
    > ## Are these changes tested?
    > Yes
    >
    
    I've also added the integration tests as requested in the original PR.
    
    > ## Are there any user-facing changes?
    > Yes - new FFI API that enables custom `TableProviderFactory`
    implementations in foreign languages. This is an additive change with no
    breaking changes to existing APIs.
    
    Also, I'd like to thank @Weijun-H for the initial version of this PR as
    it simplified getting up to speed on the serialization logic that I
    hadn't encountered yet.
    
    ---------
    
    Co-authored-by: Weijun-H <[email protected]>
---
 datafusion/ffi/src/lib.rs                          |   1 +
 datafusion/ffi/src/table_provider_factory.rs       | 429 +++++++++++++++++++++
 datafusion/ffi/src/tests/mod.rs                    |  19 +-
 datafusion/ffi/src/tests/table_provider_factory.rs |  58 +++
 datafusion/ffi/tests/ffi_integration.rs            |  46 ++-
 5 files changed, 550 insertions(+), 3 deletions(-)

diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 2ca9b8f6f4..5eb3626db1 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -39,6 +39,7 @@ pub mod record_batch_stream;
 pub mod schema_provider;
 pub mod session;
 pub mod table_provider;
+pub mod table_provider_factory;
 pub mod table_source;
 pub mod udaf;
 pub mod udf;
diff --git a/datafusion/ffi/src/table_provider_factory.rs 
b/datafusion/ffi/src/table_provider_factory.rs
new file mode 100644
index 0000000000..15789eeab0
--- /dev/null
+++ b/datafusion/ffi/src/table_provider_factory.rs
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    StableAbi,
+    std_types::{RResult, RString, RVec},
+};
+use async_ffi::{FfiFuture, FutureExt};
+use async_trait::async_trait;
+use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion_common::error::{DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
+use datafusion_proto::logical_plan::{
+    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
+use datafusion_proto::protobuf::LogicalPlanNode;
+use prost::Message;
+use tokio::runtime::Handle;
+
+use crate::execution::FFI_TaskContextProvider;
+use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::session::{FFI_SessionRef, ForeignSession};
+use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
+use crate::{df_result, rresult_return};
+
+/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries.
+///
+/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern 
where:
+/// - The `FFI_*` struct exposes stable function pointers
+/// - Private data is stored as an opaque pointer
+/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI 
boundary
+///
+/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct FFI_TableProviderFactory {
+    /// Create a TableProvider with the given command.
+    ///
+    /// # Arguments
+    ///
+    /// * `factory` - the table provider factory
+    /// * `session_config` - session configuration
+    /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a 
[`LogicalPlanNode`] protobuf message serialized into bytes
+    ///   to pass across the FFI boundary.
+    create: unsafe extern "C" fn(
+        factory: &Self,
+        session: FFI_SessionRef,
+        cmd_serialized: RVec<u8>,
+    ) -> FfiFuture<RResult<FFI_TableProvider, RString>>,
+
+    logical_codec: FFI_LogicalExtensionCodec,
+
+    /// Used to create a clone of the factory. This should only need to be 
called
+    /// by the receiver of the factory.
+    clone: unsafe extern "C" fn(factory: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    release: unsafe extern "C" fn(factory: &mut Self),
+
+    /// Return the major DataFusion version number of this factory.
+    version: unsafe extern "C" fn() -> u64,
+
+    /// Internal data. This is only to be accessed by the provider of the 
factory.
+    /// A [`ForeignTableProviderFactory`] should never attempt to access this 
data.
+    private_data: *mut c_void,
+
+    /// Utility to identify when FFI objects are accessed locally through
+    /// the foreign interface. See [`crate::get_library_marker_id`] and
+    /// the crate's `README.md` for more information.
+    library_marker_id: extern "C" fn() -> usize,
+}
+
+unsafe impl Send for FFI_TableProviderFactory {}
+unsafe impl Sync for FFI_TableProviderFactory {}
+
+struct FactoryPrivateData {
+    factory: Arc<dyn TableProviderFactory + Send>,
+    runtime: Option<Handle>,
+}
+
+impl FFI_TableProviderFactory {
+    /// Creates a new [`FFI_TableProvider`].
+    pub fn new(
+        factory: Arc<dyn TableProviderFactory + Send>,
+        runtime: Option<Handle>,
+        task_ctx_provider: impl Into<FFI_TaskContextProvider>,
+        logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
+    ) -> Self {
+        let task_ctx_provider = task_ctx_provider.into();
+        let logical_codec =
+            logical_codec.unwrap_or_else(|| 
Arc::new(DefaultLogicalExtensionCodec {}));
+        let logical_codec = FFI_LogicalExtensionCodec::new(
+            logical_codec,
+            runtime.clone(),
+            task_ctx_provider.clone(),
+        );
+        Self::new_with_ffi_codec(factory, runtime, logical_codec)
+    }
+
+    pub fn new_with_ffi_codec(
+        factory: Arc<dyn TableProviderFactory + Send>,
+        runtime: Option<Handle>,
+        logical_codec: FFI_LogicalExtensionCodec,
+    ) -> Self {
+        let private_data = Box::new(FactoryPrivateData { factory, runtime });
+
+        Self {
+            create: create_fn_wrapper,
+            logical_codec,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            version: super::version,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+            library_marker_id: crate::get_library_marker_id,
+        }
+    }
+
+    fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
+        let private_data = self.private_data as *const FactoryPrivateData;
+        unsafe { &(*private_data).factory }
+    }
+
+    fn runtime(&self) -> &Option<Handle> {
+        let private_data = self.private_data as *const FactoryPrivateData;
+        unsafe { &(*private_data).runtime }
+    }
+
+    fn deserialize_cmd(
+        &self,
+        cmd_serialized: &RVec<u8>,
+    ) -> Result<CreateExternalTable, DataFusionError> {
+        let task_ctx: Arc<TaskContext> =
+            (&self.logical_codec.task_ctx_provider).try_into()?;
+        let logical_codec: Arc<dyn LogicalExtensionCodec> = 
(&self.logical_codec).into();
+
+        let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
+            .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
+        match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
+            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => 
Ok(cmd),
+            _ => Err(DataFusionError::Internal(
+                "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
+            )),
+        }
+    }
+}
+
+impl Clone for FFI_TableProviderFactory {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+impl Drop for FFI_TableProviderFactory {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
+    fn from(factory: &FFI_TableProviderFactory) -> Self {
+        if (factory.library_marker_id)() == crate::get_library_marker_id() {
+            Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
+        } else {
+            Arc::new(ForeignTableProviderFactory(factory.clone()))
+        }
+    }
+}
+
+unsafe extern "C" fn create_fn_wrapper(
+    factory: &FFI_TableProviderFactory,
+    session: FFI_SessionRef,
+    cmd_serialized: RVec<u8>,
+) -> FfiFuture<RResult<FFI_TableProvider, RString>> {
+    let factory = factory.clone();
+
+    async move {
+        let provider = rresult_return!(
+            create_fn_wrapper_impl(factory, session, cmd_serialized).await
+        );
+        RResult::ROk(provider)
+    }
+    .into_ffi()
+}
+
+async fn create_fn_wrapper_impl(
+    factory: FFI_TableProviderFactory,
+    session: FFI_SessionRef,
+    cmd_serialized: RVec<u8>,
+) -> Result<FFI_TableProvider, DataFusionError> {
+    let runtime = factory.runtime().clone();
+    let ffi_logical_codec = factory.logical_codec.clone();
+    let internal_factory = Arc::clone(factory.inner());
+    let cmd = factory.deserialize_cmd(&cmd_serialized)?;
+
+    let mut foreign_session = None;
+    let session = session
+        .as_local()
+        .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
+        .unwrap_or_else(|| {
+            foreign_session = Some(ForeignSession::try_from(&session)?);
+            Ok(foreign_session.as_ref().unwrap())
+        })?;
+
+    let provider = internal_factory.create(session, &cmd).await?;
+    Ok(FFI_TableProvider::new_with_ffi_codec(
+        provider,
+        true,
+        runtime.clone(),
+        ffi_logical_codec,
+    ))
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+    factory: &FFI_TableProviderFactory,
+) -> FFI_TableProviderFactory {
+    let runtime = factory.runtime().clone();
+    let old_factory = Arc::clone(factory.inner());
+
+    let private_data = Box::into_raw(Box::new(FactoryPrivateData {
+        factory: old_factory,
+        runtime,
+    })) as *mut c_void;
+
+    FFI_TableProviderFactory {
+        create: create_fn_wrapper,
+        logical_codec: factory.logical_codec.clone(),
+        clone: clone_fn_wrapper,
+        release: release_fn_wrapper,
+        version: super::version,
+        private_data,
+        library_marker_id: crate::get_library_marker_id,
+    }
+}
+
+unsafe extern "C" fn release_fn_wrapper(factory: &mut 
FFI_TableProviderFactory) {
+    unsafe {
+        debug_assert!(!factory.private_data.is_null());
+        let private_data = Box::from_raw(factory.private_data as *mut 
FactoryPrivateData);
+        drop(private_data);
+        factory.private_data = std::ptr::null_mut();
+    }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so 
it has
+/// no guarantees about being able to access the data in `private_data`. Any 
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_TableProviderFactory to interact with the foreign table provider 
factory.
+#[derive(Debug)]
+pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
+
+impl ForeignTableProviderFactory {
+    fn serialize_cmd(
+        &self,
+        cmd: CreateExternalTable,
+    ) -> Result<RVec<u8>, DataFusionError> {
+        let logical_codec: Arc<dyn LogicalExtensionCodec> =
+            (&self.0.logical_codec).into();
+
+        let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
+        let plan: LogicalPlanNode =
+            AsLogicalPlan::try_from_logical_plan(&plan, 
logical_codec.as_ref())?;
+
+        let mut buf: Vec<u8> = Vec::new();
+        plan.try_encode(&mut buf)?;
+
+        Ok(buf.into())
+    }
+}
+
+unsafe impl Send for ForeignTableProviderFactory {}
+unsafe impl Sync for ForeignTableProviderFactory {}
+
+#[async_trait]
+impl TableProviderFactory for ForeignTableProviderFactory {
+    async fn create(
+        &self,
+        session: &dyn Session,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let session = FFI_SessionRef::new(session, None, 
self.0.logical_codec.clone());
+        let cmd = self.serialize_cmd(cmd.clone())?;
+
+        let provider = unsafe {
+            let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
+
+            let ffi_provider = df_result!(maybe_provider)?;
+            ForeignTableProvider(ffi_provider)
+        };
+
+        Ok(Arc::new(provider))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::datatypes::Schema;
+    use datafusion::prelude::SessionContext;
+    use datafusion_common::{TableReference, ToDFSchema};
+    use datafusion_execution::TaskContextProvider;
+    use std::collections::HashMap;
+
+    use super::*;
+
+    #[derive(Debug)]
+    struct TestTableProviderFactory {}
+
+    #[async_trait]
+    impl TableProviderFactory for TestTableProviderFactory {
+        async fn create(
+            &self,
+            _session: &dyn Session,
+            _cmd: &CreateExternalTable,
+        ) -> Result<Arc<dyn TableProvider>> {
+            use arrow::datatypes::Field;
+            use datafusion::arrow::array::Float32Array;
+            use datafusion::arrow::datatypes::DataType;
+            use datafusion::arrow::record_batch::RecordBatch;
+            use datafusion::datasource::MemTable;
+
+            let schema =
+                Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
false)]));
+
+            let batch1 = RecordBatch::try_new(
+                Arc::clone(&schema),
+                vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
+            )?;
+            let batch2 = RecordBatch::try_new(
+                Arc::clone(&schema),
+                vec![Arc::new(Float32Array::from(vec![64.0]))],
+            )?;
+
+            Ok(Arc::new(MemTable::try_new(
+                schema,
+                vec![vec![batch1], vec![batch2]],
+            )?))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
+        let ctx = Arc::new(SessionContext::new());
+        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn 
TaskContextProvider>;
+        let task_ctx_provider = 
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+        let factory = Arc::new(TestTableProviderFactory {});
+        let mut ffi_factory =
+            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, 
None);
+        ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
+
+        let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
+
+        let cmd = CreateExternalTable {
+            schema: Schema::empty().to_dfschema_ref()?,
+            name: TableReference::bare("test_table"),
+            location: "test".to_string(),
+            file_type: "test".to_string(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            or_replace: false,
+            temporary: false,
+            definition: None,
+            order_exprs: vec![],
+            unbounded: false,
+            options: HashMap::new(),
+            constraints: Default::default(),
+            column_defaults: HashMap::new(),
+        };
+
+        let provider = factory.create(&ctx.state(), &cmd).await?;
+
+        assert_eq!(provider.schema().fields().len(), 1);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ffi_table_provider_factory_clone() -> Result<()> {
+        let ctx = Arc::new(SessionContext::new());
+        let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn 
TaskContextProvider>;
+        let task_ctx_provider = 
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+        let factory = Arc::new(TestTableProviderFactory {});
+        let ffi_factory =
+            FFI_TableProviderFactory::new(factory, None, task_ctx_provider, 
None);
+
+        // Test that we can clone the factory
+        let cloned_factory = ffi_factory.clone();
+        let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
+
+        let cmd = CreateExternalTable {
+            schema: Schema::empty().to_dfschema_ref()?,
+            name: TableReference::bare("cloned_test"),
+            location: "test".to_string(),
+            file_type: "test".to_string(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            or_replace: false,
+            temporary: false,
+            definition: None,
+            order_exprs: vec![],
+            unbounded: false,
+            options: HashMap::new(),
+            constraints: Default::default(),
+            column_defaults: HashMap::new(),
+        };
+
+        let provider = factory.create(&ctx.state(), &cmd).await?;
+        assert_eq!(provider.schema().fields().len(), 1);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 9bcd7e0031..c936330668 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -34,19 +34,21 @@ use udf_udaf_udwf::{
     create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
 };
 
-use super::table_provider::FFI_TableProvider;
-use super::udf::FFI_ScalarUDF;
 use crate::catalog_provider::FFI_CatalogProvider;
 use crate::catalog_provider_list::FFI_CatalogProviderList;
 use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::table_provider::FFI_TableProvider;
+use crate::table_provider_factory::FFI_TableProviderFactory;
 use crate::tests::catalog::create_catalog_provider_list;
 use crate::udaf::FFI_AggregateUDF;
+use crate::udf::FFI_ScalarUDF;
 use crate::udtf::FFI_TableFunction;
 use crate::udwf::FFI_WindowUDF;
 
 mod async_provider;
 pub mod catalog;
 mod sync_provider;
+mod table_provider_factory;
 mod udf_udaf_udwf;
 pub mod utils;
 
@@ -71,6 +73,10 @@ pub struct ForeignLibraryModule {
         codec: FFI_LogicalExtensionCodec,
     ) -> FFI_TableProvider,
 
+    /// Constructs the table provider factory
+    pub create_table_factory:
+        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> 
FFI_TableProviderFactory,
+
     /// Create a scalar UDF
     pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
 
@@ -128,6 +134,14 @@ extern "C" fn construct_table_provider(
     }
 }
 
+/// Here we only wish to create a simple table provider as an example.
+/// We create an in-memory table and convert it to it's FFI counterpart.
+extern "C" fn construct_table_provider_factory(
+    codec: FFI_LogicalExtensionCodec,
+) -> FFI_TableProviderFactory {
+    table_provider_factory::create(codec)
+}
+
 #[export_root_module]
 /// This defines the entry point for using the module.
 pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
@@ -135,6 +149,7 @@ pub fn get_foreign_library_module() -> 
ForeignLibraryModuleRef {
         create_catalog: create_catalog_provider,
         create_catalog_list: create_catalog_provider_list,
         create_table: construct_table_provider,
+        create_table_factory: construct_table_provider_factory,
         create_scalar_udf: create_ffi_abs_func,
         create_nullary_udf: create_ffi_random_func,
         create_table_function: create_ffi_table_func,
diff --git a/datafusion/ffi/src/tests/table_provider_factory.rs 
b/datafusion/ffi/src/tests/table_provider_factory.rs
new file mode 100644
index 0000000000..29af6aacf6
--- /dev/null
+++ b/datafusion/ffi/src/tests/table_provider_factory.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_catalog::{MemTable, Session, TableProvider, 
TableProviderFactory};
+use datafusion_common::Result;
+use datafusion_expr::CreateExternalTable;
+
+use super::{create_record_batch, create_test_schema};
+use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::table_provider_factory::FFI_TableProviderFactory;
+
+#[derive(Debug)]
+pub struct TestTableProviderFactory {}
+
+#[async_trait]
+impl TableProviderFactory for TestTableProviderFactory {
+    async fn create(
+        &self,
+        _session: &dyn Session,
+        _cmd: &CreateExternalTable,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let schema = create_test_schema();
+
+        // It is useful to create these as multiple record batches
+        // so that we can demonstrate the FFI stream.
+        let batches = vec![
+            create_record_batch(1, 5),
+            create_record_batch(6, 1),
+            create_record_batch(7, 5),
+        ];
+
+        let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
+
+        Ok(Arc::new(table_provider))
+    }
+}
+
+pub(crate) fn create(codec: FFI_LogicalExtensionCodec) -> 
FFI_TableProviderFactory {
+    let factory = TestTableProviderFactory {};
+    FFI_TableProviderFactory::new_with_ffi_codec(Arc::new(factory), None, 
codec)
+}
diff --git a/datafusion/ffi/tests/ffi_integration.rs 
b/datafusion/ffi/tests/ffi_integration.rs
index 2d18679cb0..80538d4f92 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -21,10 +21,15 @@ mod utils;
 /// when the feature integration-tests is built
 #[cfg(feature = "integration-tests")]
 mod tests {
+    use std::collections::HashMap;
     use std::sync::Arc;
 
-    use datafusion::catalog::TableProvider;
+    use arrow::datatypes::Schema;
+    use datafusion::catalog::{TableProvider, TableProviderFactory};
     use datafusion::error::{DataFusionError, Result};
+    use datafusion_common::TableReference;
+    use datafusion_common::ToDFSchema;
+    use datafusion_expr::CreateExternalTable;
     use datafusion_ffi::tests::create_record_batch;
     use datafusion_ffi::tests::utils::get_module;
 
@@ -69,4 +74,43 @@ mod tests {
     async fn sync_test_table_provider() -> Result<()> {
         test_table_provider(true).await
     }
+
+    #[tokio::test]
+    async fn test_table_provider_factory() -> Result<()> {
+        let table_provider_module = get_module()?;
+        let (ctx, codec) = super::utils::ctx_and_codec();
+
+        let ffi_table_provider_factory = table_provider_module
+            .create_table_factory()
+            .ok_or(DataFusionError::NotImplemented(
+                "External table provider factory failed to implement 
create".to_string(),
+            ))?(codec);
+
+        let foreign_table_provider_factory: Arc<dyn TableProviderFactory> =
+            (&ffi_table_provider_factory).into();
+
+        let cmd = CreateExternalTable {
+            schema: Schema::empty().to_dfschema_ref()?,
+            name: TableReference::bare("cloned_test"),
+            location: "test".to_string(),
+            file_type: "test".to_string(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            or_replace: false,
+            temporary: false,
+            definition: None,
+            order_exprs: vec![],
+            unbounded: false,
+            options: HashMap::new(),
+            constraints: Default::default(),
+            column_defaults: HashMap::new(),
+        };
+
+        let provider = foreign_table_provider_factory
+            .create(&ctx.state(), &cmd)
+            .await?;
+        assert_eq!(provider.schema().fields().len(), 2);
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to