This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0d63ced04a Implement FFI table provider factory (#20326)
0d63ced04a is described below
commit 0d63ced04a6d961b1e33016a5d604172314d4aae
Author: Paul J. Davis <[email protected]>
AuthorDate: Sat Feb 21 06:36:30 2026 -0600
Implement FFI table provider factory (#20326)
> ## Which issue does this PR close?
> * Closes [expose TableProviderFactory via
FFI #17942](https://github.com/apache/datafusion/issues/17942)
>
This PR is re-opening PR #17994 and updating it to match the current FFI
approach (I.e., I made it look like the FFI_TableProvider in various
places).
> ## Rationale for this change
> Expose `TableProviderFactory` via FFI to enable external languages
(e.g., Python) to implement custom table provider factories and extend
DataFusion with new data source types.
>
> ## What changes are included in this PR?
> * Added `datafusion/ffi/src/table_provider_factory.rs` with:
>
> * `FFI_TableProviderFactory`: Stable C ABI struct with function
pointers for `create`, `clone`, `release`, and `version`
> * `ForeignTableProviderFactory`: Wrapper implementing
`TableProviderFactory` trait
>
> ## Are these changes tested?
> Yes
>
I've also added the integration tests as requested in the original PR.
> ## Are there any user-facing changes?
> Yes - new FFI API that enables custom `TableProviderFactory`
implementations in foreign languages. This is an additive change with no
breaking changes to existing APIs.
Also, I'd like to thank @Weijun-H for the initial version of this PR as
it simplified getting up to speed on the serialization logic that I
hadn't encountered yet.
---------
Co-authored-by: Weijun-H <[email protected]>
---
datafusion/ffi/src/lib.rs | 1 +
datafusion/ffi/src/table_provider_factory.rs | 429 +++++++++++++++++++++
datafusion/ffi/src/tests/mod.rs | 19 +-
datafusion/ffi/src/tests/table_provider_factory.rs | 58 +++
datafusion/ffi/tests/ffi_integration.rs | 46 ++-
5 files changed, 550 insertions(+), 3 deletions(-)
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 2ca9b8f6f4..5eb3626db1 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -39,6 +39,7 @@ pub mod record_batch_stream;
pub mod schema_provider;
pub mod session;
pub mod table_provider;
+pub mod table_provider_factory;
pub mod table_source;
pub mod udaf;
pub mod udf;
diff --git a/datafusion/ffi/src/table_provider_factory.rs
b/datafusion/ffi/src/table_provider_factory.rs
new file mode 100644
index 0000000000..15789eeab0
--- /dev/null
+++ b/datafusion/ffi/src/table_provider_factory.rs
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, sync::Arc};
+
+use abi_stable::{
+ StableAbi,
+ std_types::{RResult, RString, RVec},
+};
+use async_ffi::{FfiFuture, FutureExt};
+use async_trait::async_trait;
+use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion_common::error::{DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
+use datafusion_proto::logical_plan::{
+ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
+use datafusion_proto::protobuf::LogicalPlanNode;
+use prost::Message;
+use tokio::runtime::Handle;
+
+use crate::execution::FFI_TaskContextProvider;
+use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::session::{FFI_SessionRef, ForeignSession};
+use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
+use crate::{df_result, rresult_return};
+
+/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries.
+///
+/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern
where:
+/// - The `FFI_*` struct exposes stable function pointers
+/// - Private data is stored as an opaque pointer
+/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI
boundary
+///
+/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct FFI_TableProviderFactory {
+ /// Create a TableProvider with the given command.
+ ///
+ /// # Arguments
+ ///
+ /// * `factory` - the table provider factory
+ /// * `session_config` - session configuration
+ /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a
[`LogicalPlanNode`] protobuf message serialized into bytes
+ /// to pass across the FFI boundary.
+ create: unsafe extern "C" fn(
+ factory: &Self,
+ session: FFI_SessionRef,
+ cmd_serialized: RVec<u8>,
+ ) -> FfiFuture<RResult<FFI_TableProvider, RString>>,
+
+ logical_codec: FFI_LogicalExtensionCodec,
+
+ /// Used to create a clone of the factory. This should only need to be
called
+ /// by the receiver of the factory.
+ clone: unsafe extern "C" fn(factory: &Self) -> Self,
+
+ /// Release the memory of the private data when it is no longer being used.
+ release: unsafe extern "C" fn(factory: &mut Self),
+
+ /// Return the major DataFusion version number of this factory.
+ version: unsafe extern "C" fn() -> u64,
+
+ /// Internal data. This is only to be accessed by the provider of the
factory.
+ /// A [`ForeignTableProviderFactory`] should never attempt to access this
data.
+ private_data: *mut c_void,
+
+ /// Utility to identify when FFI objects are accessed locally through
+ /// the foreign interface. See [`crate::get_library_marker_id`] and
+ /// the crate's `README.md` for more information.
+ library_marker_id: extern "C" fn() -> usize,
+}
+
+unsafe impl Send for FFI_TableProviderFactory {}
+unsafe impl Sync for FFI_TableProviderFactory {}
+
+struct FactoryPrivateData {
+ factory: Arc<dyn TableProviderFactory + Send>,
+ runtime: Option<Handle>,
+}
+
+impl FFI_TableProviderFactory {
+ /// Creates a new [`FFI_TableProvider`].
+ pub fn new(
+ factory: Arc<dyn TableProviderFactory + Send>,
+ runtime: Option<Handle>,
+ task_ctx_provider: impl Into<FFI_TaskContextProvider>,
+ logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
+ ) -> Self {
+ let task_ctx_provider = task_ctx_provider.into();
+ let logical_codec =
+ logical_codec.unwrap_or_else(||
Arc::new(DefaultLogicalExtensionCodec {}));
+ let logical_codec = FFI_LogicalExtensionCodec::new(
+ logical_codec,
+ runtime.clone(),
+ task_ctx_provider.clone(),
+ );
+ Self::new_with_ffi_codec(factory, runtime, logical_codec)
+ }
+
+ pub fn new_with_ffi_codec(
+ factory: Arc<dyn TableProviderFactory + Send>,
+ runtime: Option<Handle>,
+ logical_codec: FFI_LogicalExtensionCodec,
+ ) -> Self {
+ let private_data = Box::new(FactoryPrivateData { factory, runtime });
+
+ Self {
+ create: create_fn_wrapper,
+ logical_codec,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data: Box::into_raw(private_data) as *mut c_void,
+ library_marker_id: crate::get_library_marker_id,
+ }
+ }
+
+ fn inner(&self) -> &Arc<dyn TableProviderFactory + Send> {
+ let private_data = self.private_data as *const FactoryPrivateData;
+ unsafe { &(*private_data).factory }
+ }
+
+ fn runtime(&self) -> &Option<Handle> {
+ let private_data = self.private_data as *const FactoryPrivateData;
+ unsafe { &(*private_data).runtime }
+ }
+
+ fn deserialize_cmd(
+ &self,
+ cmd_serialized: &RVec<u8>,
+ ) -> Result<CreateExternalTable, DataFusionError> {
+ let task_ctx: Arc<TaskContext> =
+ (&self.logical_codec.task_ctx_provider).try_into()?;
+ let logical_codec: Arc<dyn LogicalExtensionCodec> =
(&self.logical_codec).into();
+
+ let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
+ .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
+ match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
+ LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =>
Ok(cmd),
+ _ => Err(DataFusionError::Internal(
+ "Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
+ )),
+ }
+ }
+}
+
+impl Clone for FFI_TableProviderFactory {
+ fn clone(&self) -> Self {
+ unsafe { (self.clone)(self) }
+ }
+}
+
+impl Drop for FFI_TableProviderFactory {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+impl From<&FFI_TableProviderFactory> for Arc<dyn TableProviderFactory> {
+ fn from(factory: &FFI_TableProviderFactory) -> Self {
+ if (factory.library_marker_id)() == crate::get_library_marker_id() {
+ Arc::clone(factory.inner()) as Arc<dyn TableProviderFactory>
+ } else {
+ Arc::new(ForeignTableProviderFactory(factory.clone()))
+ }
+ }
+}
+
+unsafe extern "C" fn create_fn_wrapper(
+ factory: &FFI_TableProviderFactory,
+ session: FFI_SessionRef,
+ cmd_serialized: RVec<u8>,
+) -> FfiFuture<RResult<FFI_TableProvider, RString>> {
+ let factory = factory.clone();
+
+ async move {
+ let provider = rresult_return!(
+ create_fn_wrapper_impl(factory, session, cmd_serialized).await
+ );
+ RResult::ROk(provider)
+ }
+ .into_ffi()
+}
+
+async fn create_fn_wrapper_impl(
+ factory: FFI_TableProviderFactory,
+ session: FFI_SessionRef,
+ cmd_serialized: RVec<u8>,
+) -> Result<FFI_TableProvider, DataFusionError> {
+ let runtime = factory.runtime().clone();
+ let ffi_logical_codec = factory.logical_codec.clone();
+ let internal_factory = Arc::clone(factory.inner());
+ let cmd = factory.deserialize_cmd(&cmd_serialized)?;
+
+ let mut foreign_session = None;
+ let session = session
+ .as_local()
+ .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
+ .unwrap_or_else(|| {
+ foreign_session = Some(ForeignSession::try_from(&session)?);
+ Ok(foreign_session.as_ref().unwrap())
+ })?;
+
+ let provider = internal_factory.create(session, &cmd).await?;
+ Ok(FFI_TableProvider::new_with_ffi_codec(
+ provider,
+ true,
+ runtime.clone(),
+ ffi_logical_codec,
+ ))
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+ factory: &FFI_TableProviderFactory,
+) -> FFI_TableProviderFactory {
+ let runtime = factory.runtime().clone();
+ let old_factory = Arc::clone(factory.inner());
+
+ let private_data = Box::into_raw(Box::new(FactoryPrivateData {
+ factory: old_factory,
+ runtime,
+ })) as *mut c_void;
+
+ FFI_TableProviderFactory {
+ create: create_fn_wrapper,
+ logical_codec: factory.logical_codec.clone(),
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data,
+ library_marker_id: crate::get_library_marker_id,
+ }
+}
+
+unsafe extern "C" fn release_fn_wrapper(factory: &mut
FFI_TableProviderFactory) {
+ unsafe {
+ debug_assert!(!factory.private_data.is_null());
+ let private_data = Box::from_raw(factory.private_data as *mut
FactoryPrivateData);
+ drop(private_data);
+ factory.private_data = std::ptr::null_mut();
+ }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so
it has
+/// no guarantees about being able to access the data in `private_data`. Any
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_TableProviderFactory to interact with the foreign table provider
factory.
+#[derive(Debug)]
+pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory);
+
+impl ForeignTableProviderFactory {
+ fn serialize_cmd(
+ &self,
+ cmd: CreateExternalTable,
+ ) -> Result<RVec<u8>, DataFusionError> {
+ let logical_codec: Arc<dyn LogicalExtensionCodec> =
+ (&self.0.logical_codec).into();
+
+ let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
+ let plan: LogicalPlanNode =
+ AsLogicalPlan::try_from_logical_plan(&plan,
logical_codec.as_ref())?;
+
+ let mut buf: Vec<u8> = Vec::new();
+ plan.try_encode(&mut buf)?;
+
+ Ok(buf.into())
+ }
+}
+
+unsafe impl Send for ForeignTableProviderFactory {}
+unsafe impl Sync for ForeignTableProviderFactory {}
+
+#[async_trait]
+impl TableProviderFactory for ForeignTableProviderFactory {
+ async fn create(
+ &self,
+ session: &dyn Session,
+ cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let session = FFI_SessionRef::new(session, None,
self.0.logical_codec.clone());
+ let cmd = self.serialize_cmd(cmd.clone())?;
+
+ let provider = unsafe {
+ let maybe_provider = (self.0.create)(&self.0, session, cmd).await;
+
+ let ffi_provider = df_result!(maybe_provider)?;
+ ForeignTableProvider(ffi_provider)
+ };
+
+ Ok(Arc::new(provider))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::datatypes::Schema;
+ use datafusion::prelude::SessionContext;
+ use datafusion_common::{TableReference, ToDFSchema};
+ use datafusion_execution::TaskContextProvider;
+ use std::collections::HashMap;
+
+ use super::*;
+
+ #[derive(Debug)]
+ struct TestTableProviderFactory {}
+
+ #[async_trait]
+ impl TableProviderFactory for TestTableProviderFactory {
+ async fn create(
+ &self,
+ _session: &dyn Session,
+ _cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ use arrow::datatypes::Field;
+ use datafusion::arrow::array::Float32Array;
+ use datafusion::arrow::datatypes::DataType;
+ use datafusion::arrow::record_batch::RecordBatch;
+ use datafusion::datasource::MemTable;
+
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+
+ let batch1 = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
+ )?;
+ let batch2 = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Float32Array::from(vec![64.0]))],
+ )?;
+
+ Ok(Arc::new(MemTable::try_new(
+ schema,
+ vec![vec![batch1], vec![batch2]],
+ )?))
+ }
+ }
+
+ #[tokio::test]
+ async fn test_round_trip_ffi_table_provider_factory() -> Result<()> {
+ let ctx = Arc::new(SessionContext::new());
+ let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn
TaskContextProvider>;
+ let task_ctx_provider =
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+ let factory = Arc::new(TestTableProviderFactory {});
+ let mut ffi_factory =
+ FFI_TableProviderFactory::new(factory, None, task_ctx_provider,
None);
+ ffi_factory.library_marker_id = crate::mock_foreign_marker_id;
+
+ let factory: Arc<dyn TableProviderFactory> = (&ffi_factory).into();
+
+ let cmd = CreateExternalTable {
+ schema: Schema::empty().to_dfschema_ref()?,
+ name: TableReference::bare("test_table"),
+ location: "test".to_string(),
+ file_type: "test".to_string(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ or_replace: false,
+ temporary: false,
+ definition: None,
+ order_exprs: vec![],
+ unbounded: false,
+ options: HashMap::new(),
+ constraints: Default::default(),
+ column_defaults: HashMap::new(),
+ };
+
+ let provider = factory.create(&ctx.state(), &cmd).await?;
+
+ assert_eq!(provider.schema().fields().len(), 1);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_ffi_table_provider_factory_clone() -> Result<()> {
+ let ctx = Arc::new(SessionContext::new());
+ let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn
TaskContextProvider>;
+ let task_ctx_provider =
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+ let factory = Arc::new(TestTableProviderFactory {});
+ let ffi_factory =
+ FFI_TableProviderFactory::new(factory, None, task_ctx_provider,
None);
+
+ // Test that we can clone the factory
+ let cloned_factory = ffi_factory.clone();
+ let factory: Arc<dyn TableProviderFactory> = (&cloned_factory).into();
+
+ let cmd = CreateExternalTable {
+ schema: Schema::empty().to_dfschema_ref()?,
+ name: TableReference::bare("cloned_test"),
+ location: "test".to_string(),
+ file_type: "test".to_string(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ or_replace: false,
+ temporary: false,
+ definition: None,
+ order_exprs: vec![],
+ unbounded: false,
+ options: HashMap::new(),
+ constraints: Default::default(),
+ column_defaults: HashMap::new(),
+ };
+
+ let provider = factory.create(&ctx.state(), &cmd).await?;
+ assert_eq!(provider.schema().fields().len(), 1);
+
+ Ok(())
+ }
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 9bcd7e0031..c936330668 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -34,19 +34,21 @@ use udf_udaf_udwf::{
create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
};
-use super::table_provider::FFI_TableProvider;
-use super::udf::FFI_ScalarUDF;
use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::table_provider::FFI_TableProvider;
+use crate::table_provider_factory::FFI_TableProviderFactory;
use crate::tests::catalog::create_catalog_provider_list;
use crate::udaf::FFI_AggregateUDF;
+use crate::udf::FFI_ScalarUDF;
use crate::udtf::FFI_TableFunction;
use crate::udwf::FFI_WindowUDF;
mod async_provider;
pub mod catalog;
mod sync_provider;
+mod table_provider_factory;
mod udf_udaf_udwf;
pub mod utils;
@@ -71,6 +73,10 @@ pub struct ForeignLibraryModule {
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableProvider,
+ /// Constructs the table provider factory
+ pub create_table_factory:
+ extern "C" fn(codec: FFI_LogicalExtensionCodec) ->
FFI_TableProviderFactory,
+
/// Create a scalar UDF
pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
@@ -128,6 +134,14 @@ extern "C" fn construct_table_provider(
}
}
+/// Here we only wish to create a simple table provider as an example.
+/// We create an in-memory table and convert it to it's FFI counterpart.
+extern "C" fn construct_table_provider_factory(
+ codec: FFI_LogicalExtensionCodec,
+) -> FFI_TableProviderFactory {
+ table_provider_factory::create(codec)
+}
+
#[export_root_module]
/// This defines the entry point for using the module.
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
@@ -135,6 +149,7 @@ pub fn get_foreign_library_module() ->
ForeignLibraryModuleRef {
create_catalog: create_catalog_provider,
create_catalog_list: create_catalog_provider_list,
create_table: construct_table_provider,
+ create_table_factory: construct_table_provider_factory,
create_scalar_udf: create_ffi_abs_func,
create_nullary_udf: create_ffi_random_func,
create_table_function: create_ffi_table_func,
diff --git a/datafusion/ffi/src/tests/table_provider_factory.rs
b/datafusion/ffi/src/tests/table_provider_factory.rs
new file mode 100644
index 0000000000..29af6aacf6
--- /dev/null
+++ b/datafusion/ffi/src/tests/table_provider_factory.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_catalog::{MemTable, Session, TableProvider,
TableProviderFactory};
+use datafusion_common::Result;
+use datafusion_expr::CreateExternalTable;
+
+use super::{create_record_batch, create_test_schema};
+use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use crate::table_provider_factory::FFI_TableProviderFactory;
+
+#[derive(Debug)]
+pub struct TestTableProviderFactory {}
+
+#[async_trait]
+impl TableProviderFactory for TestTableProviderFactory {
+ async fn create(
+ &self,
+ _session: &dyn Session,
+ _cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let schema = create_test_schema();
+
+ // It is useful to create these as multiple record batches
+ // so that we can demonstrate the FFI stream.
+ let batches = vec![
+ create_record_batch(1, 5),
+ create_record_batch(6, 1),
+ create_record_batch(7, 5),
+ ];
+
+ let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
+
+ Ok(Arc::new(table_provider))
+ }
+}
+
+pub(crate) fn create(codec: FFI_LogicalExtensionCodec) ->
FFI_TableProviderFactory {
+ let factory = TestTableProviderFactory {};
+ FFI_TableProviderFactory::new_with_ffi_codec(Arc::new(factory), None,
codec)
+}
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_integration.rs
index 2d18679cb0..80538d4f92 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -21,10 +21,15 @@ mod utils;
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
+ use std::collections::HashMap;
use std::sync::Arc;
- use datafusion::catalog::TableProvider;
+ use arrow::datatypes::Schema;
+ use datafusion::catalog::{TableProvider, TableProviderFactory};
use datafusion::error::{DataFusionError, Result};
+ use datafusion_common::TableReference;
+ use datafusion_common::ToDFSchema;
+ use datafusion_expr::CreateExternalTable;
use datafusion_ffi::tests::create_record_batch;
use datafusion_ffi::tests::utils::get_module;
@@ -69,4 +74,43 @@ mod tests {
async fn sync_test_table_provider() -> Result<()> {
test_table_provider(true).await
}
+
+ #[tokio::test]
+ async fn test_table_provider_factory() -> Result<()> {
+ let table_provider_module = get_module()?;
+ let (ctx, codec) = super::utils::ctx_and_codec();
+
+ let ffi_table_provider_factory = table_provider_module
+ .create_table_factory()
+ .ok_or(DataFusionError::NotImplemented(
+ "External table provider factory failed to implement
create".to_string(),
+ ))?(codec);
+
+ let foreign_table_provider_factory: Arc<dyn TableProviderFactory> =
+ (&ffi_table_provider_factory).into();
+
+ let cmd = CreateExternalTable {
+ schema: Schema::empty().to_dfschema_ref()?,
+ name: TableReference::bare("cloned_test"),
+ location: "test".to_string(),
+ file_type: "test".to_string(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ or_replace: false,
+ temporary: false,
+ definition: None,
+ order_exprs: vec![],
+ unbounded: false,
+ options: HashMap::new(),
+ constraints: Default::default(),
+ column_defaults: HashMap::new(),
+ };
+
+ let provider = foreign_table_provider_factory
+ .create(&ctx.state(), &cmd)
+ .await?;
+ assert_eq!(provider.schema().fields().len(), 2);
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]