This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new c876761 feat(r/sedonadb): Add FFI support for ScalarUDF and
TableProvider (#214)
c876761 is described below
commit c8767616297263d4436160894aa9d9a352923073
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Oct 16 15:09:40 2025 -0500
feat(r/sedonadb): Add FFI support for ScalarUDF and TableProvider (#214)
---
Cargo.lock | 1 +
r/sedonadb/NAMESPACE | 2 +
r/sedonadb/R/000-wrappers.R | 32 +++++++++-
r/sedonadb/R/context.R | 17 ++++++
r/sedonadb/R/dataframe.R | 15 ++++-
r/sedonadb/man/sd_register_udf.Rd | 20 +++++++
r/sedonadb/src/init.c | 45 +++++++++++++--
r/sedonadb/src/rust/Cargo.toml | 1 +
r/sedonadb/src/rust/api.h | 9 ++-
r/sedonadb/src/rust/src/context.rs | 47 ++++++++++-----
r/sedonadb/src/rust/src/dataframe.rs | 31 +++++++++-
r/sedonadb/src/rust/src/ffi.rs | 93 ++++++++++++++++++++++++++++++
r/sedonadb/src/rust/src/lib.rs | 1 +
r/sedonadb/tests/testthat/test-context.R | 12 ++++
r/sedonadb/tests/testthat/test-dataframe.R | 15 +++++
15 files changed, 316 insertions(+), 25 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 1eb1bc3..94550ec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5223,6 +5223,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
+ "datafusion-ffi",
"savvy",
"savvy-ffi",
"sedona",
diff --git a/r/sedonadb/NAMESPACE b/r/sedonadb/NAMESPACE
index 5e78149..73ebd6d 100644
--- a/r/sedonadb/NAMESPACE
+++ b/r/sedonadb/NAMESPACE
@@ -5,6 +5,7 @@ S3method("[[<-",savvy_sedonadb__sealed)
S3method(as.data.frame,sedonadb_dataframe)
S3method(as_nanoarrow_array_stream,sedonadb_dataframe)
S3method(as_sedonadb_dataframe,data.frame)
+S3method(as_sedonadb_dataframe,datafusion_table_provider)
S3method(as_sedonadb_dataframe,nanoarrow_array)
S3method(as_sedonadb_dataframe,nanoarrow_array_stream)
S3method(as_sedonadb_dataframe,sedonadb_dataframe)
@@ -24,6 +25,7 @@ export(sd_count)
export(sd_drop_view)
export(sd_preview)
export(sd_read_parquet)
+export(sd_register_udf)
export(sd_sql)
export(sd_to_view)
export(sd_view)
diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R
index 15ca659..c93fe0b 100644
--- a/r/sedonadb/R/000-wrappers.R
+++ b/r/sedonadb/R/000-wrappers.R
@@ -77,6 +77,12 @@ NULL
}
}
+`InternalContext_data_frame_from_table_provider` <- function(self) {
+ function(`provider_xptr`) {
+
.savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_data_frame_from_table_provider__impl,
`self`, `provider_xptr`))
+ }
+}
+
`InternalContext_deregister_table` <- function(self) {
function(`table_ref`) {
invisible(.Call(savvy_InternalContext_deregister_table__impl, `self`,
`table_ref`))
@@ -89,6 +95,18 @@ NULL
}
}
+`InternalContext_register_scalar_udf` <- function(self) {
+ function(`scalar_udf_xptr`) {
+ invisible(.Call(savvy_InternalContext_register_scalar_udf__impl, `self`,
`scalar_udf_xptr`))
+ }
+}
+
+`InternalContext_scalar_udf_xptr` <- function(self) {
+ function(`name`) {
+ .Call(savvy_InternalContext_scalar_udf_xptr__impl, `self`, `name`)
+ }
+}
+
`InternalContext_sql` <- function(self) {
function(`query`) {
.savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_sql__impl,
`self`, `query`))
@@ -105,8 +123,11 @@ NULL
e <- new.env(parent = emptyenv())
e$.ptr <- ptr
e$`data_frame_from_array_stream` <-
`InternalContext_data_frame_from_array_stream`(ptr)
+ e$`data_frame_from_table_provider` <-
`InternalContext_data_frame_from_table_provider`(ptr)
e$`deregister_table` <- `InternalContext_deregister_table`(ptr)
e$`read_parquet` <- `InternalContext_read_parquet`(ptr)
+ e$`register_scalar_udf` <- `InternalContext_register_scalar_udf`(ptr)
+ e$`scalar_udf_xptr` <- `InternalContext_scalar_udf_xptr`(ptr)
e$`sql` <- `InternalContext_sql`(ptr)
e$`view` <- `InternalContext_view`(ptr)
@@ -179,8 +200,8 @@ class(`InternalContext`) <- c("InternalContext__bundle",
"savvy_sedonadb__sealed
}
`InternalDataFrame_to_arrow_stream` <- function(self) {
- function(`out`) {
- invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`,
`out`))
+ function(`out`, `requested_schema_xptr`) {
+ invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`,
`out`, `requested_schema_xptr`))
}
}
@@ -191,6 +212,12 @@ class(`InternalContext`) <- c("InternalContext__bundle",
"savvy_sedonadb__sealed
}
}
+`InternalDataFrame_to_provider` <- function(self) {
+ function() {
+ .Call(savvy_InternalDataFrame_to_provider__impl, `self`)
+ }
+}
+
`InternalDataFrame_to_view` <- function(self) {
function(`ctx`, `table_ref`, `overwrite`) {
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
@@ -210,6 +237,7 @@ class(`InternalContext`) <- c("InternalContext__bundle",
"savvy_sedonadb__sealed
e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr)
e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr)
e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr)
+ e$`to_provider` <- `InternalDataFrame_to_provider`(ptr)
e$`to_view` <- `InternalDataFrame_to_view`(ptr)
class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed")
diff --git a/r/sedonadb/R/context.R b/r/sedonadb/R/context.R
index 86a55f8..2c921b9 100644
--- a/r/sedonadb/R/context.R
+++ b/r/sedonadb/R/context.R
@@ -80,6 +80,23 @@ sd_view <- function(table_ref) {
new_sedonadb_dataframe(ctx, df)
}
+#' Register a user-defined function
+#'
+#' Several types of user-defined functions can be registered into a session
+#' context. Currently, the only implemented variety is an external pointer
+#' to a Rust `FFI_ScalarUDF`, an example of which is available from the
+#' [DataFusion Python
documentation](https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs).
+#'
+#' @param udf An object of class 'datafusion_scalar_udf'
+#'
+#' @returns NULL, invisibly
+#' @export
+#'
+sd_register_udf <- function(udf) {
+ ctx <- ctx()
+ ctx$register_scalar_udf(udf)
+}
+
# We use just one context for now. In theory we could support multiple
# contexts with a shared runtime, which would scope the registration
# of various components more cleanly from the runtime.
diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R
index 0e5b511..4dabad5 100644
--- a/r/sedonadb/R/dataframe.R
+++ b/r/sedonadb/R/dataframe.R
@@ -67,6 +67,13 @@ as_sedonadb_dataframe.nanoarrow_array_stream <- function(x,
..., schema = NULL,
as_sedonadb_dataframe(new_sedonadb_dataframe(ctx, df), schema = schema)
}
+#' @export
+as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema =
NULL) {
+ ctx <- ctx()
+ df <- ctx$data_frame_from_table_provider(x)
+ new_sedonadb_dataframe(ctx, df)
+}
+
#' Count rows in a DataFrame
#'
#' @param .data A sedonadb_dataframe
@@ -297,9 +304,13 @@ infer_nanoarrow_schema.sedonadb_dataframe <- function(x,
...) {
#' @importFrom nanoarrow as_nanoarrow_array_stream
#' @export
-as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ...) {
+as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ..., schema =
NULL) {
+ if (!is.null(schema)) {
+ schema <- nanoarrow::as_nanoarrow_schema(schema)
+ }
+
stream <- nanoarrow::nanoarrow_allocate_array_stream()
- x$df$to_arrow_stream(stream)
+ x$df$to_arrow_stream(stream, schema)
stream
}
diff --git a/r/sedonadb/man/sd_register_udf.Rd
b/r/sedonadb/man/sd_register_udf.Rd
new file mode 100644
index 0000000..345a69a
--- /dev/null
+++ b/r/sedonadb/man/sd_register_udf.Rd
@@ -0,0 +1,20 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/context.R
+\name{sd_register_udf}
+\alias{sd_register_udf}
+\title{Register a user-defined function}
+\usage{
+sd_register_udf(udf)
+}
+\arguments{
+\item{udf}{An object of class 'datafusion_scalar_udf'}
+}
+\value{
+NULL, invisibly
+}
+\description{
+Several types of user-defined functions can be registered into a session
+context. Currently, the only implemented variety is an external pointer
+to a Rust \code{FFI_ScalarUDF}, an example of which is available from the
+\href{https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs}{DataFusion
Python documentation}.
+}
diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c
index 6988665..f72a3ca 100644
--- a/r/sedonadb/src/init.c
+++ b/r/sedonadb/src/init.c
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#include <Rinternals.h>
#include <stdint.h>
+#include <Rinternals.h>
+
#include <R_ext/Parse.h>
#include "rust/api.h"
@@ -83,6 +84,13 @@ SEXP
savvy_InternalContext_data_frame_from_array_stream__impl(
return handle_result(res);
}
+SEXP savvy_InternalContext_data_frame_from_table_provider__impl(
+ SEXP self__, SEXP c_arg__provider_xptr) {
+ SEXP res = savvy_InternalContext_data_frame_from_table_provider__ffi(
+ self__, c_arg__provider_xptr);
+ return handle_result(res);
+}
+
SEXP savvy_InternalContext_deregister_table__impl(SEXP self__,
SEXP c_arg__table_ref) {
SEXP res =
@@ -100,6 +108,19 @@ SEXP savvy_InternalContext_read_parquet__impl(SEXP self__,
SEXP c_arg__paths) {
return handle_result(res);
}
+SEXP savvy_InternalContext_register_scalar_udf__impl(
+ SEXP self__, SEXP c_arg__scalar_udf_xptr) {
+ SEXP res = savvy_InternalContext_register_scalar_udf__ffi(
+ self__, c_arg__scalar_udf_xptr);
+ return handle_result(res);
+}
+
+SEXP savvy_InternalContext_scalar_udf_xptr__impl(SEXP self__,
+ SEXP c_arg__name) {
+ SEXP res = savvy_InternalContext_scalar_udf_xptr__ffi(self__, c_arg__name);
+ return handle_result(res);
+}
+
SEXP savvy_InternalContext_sql__impl(SEXP self__, SEXP c_arg__query) {
SEXP res = savvy_InternalContext_sql__ffi(self__, c_arg__query);
return handle_result(res);
@@ -149,9 +170,10 @@ SEXP savvy_InternalDataFrame_to_arrow_schema__impl(SEXP
self__,
return handle_result(res);
}
-SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__,
- SEXP c_arg__out) {
- SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(self__, c_arg__out);
+SEXP savvy_InternalDataFrame_to_arrow_stream__impl(
+ SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr) {
+ SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(
+ self__, c_arg__out, c_arg__requested_schema_xptr);
return handle_result(res);
}
@@ -166,6 +188,11 @@ SEXP savvy_InternalDataFrame_to_parquet__impl(
return handle_result(res);
}
+SEXP savvy_InternalDataFrame_to_provider__impl(SEXP self__) {
+ SEXP res = savvy_InternalDataFrame_to_provider__ffi(self__);
+ return handle_result(res);
+}
+
SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__table_ref,
SEXP c_arg__overwrite) {
@@ -183,12 +210,18 @@ static const R_CallMethodDef CallEntries[] = {
(DL_FUNC)&savvy_sedonadb_adbc_init_func__impl, 0},
{"savvy_InternalContext_data_frame_from_array_stream__impl",
(DL_FUNC)&savvy_InternalContext_data_frame_from_array_stream__impl, 3},
+ {"savvy_InternalContext_data_frame_from_table_provider__impl",
+ (DL_FUNC)&savvy_InternalContext_data_frame_from_table_provider__impl, 2},
{"savvy_InternalContext_deregister_table__impl",
(DL_FUNC)&savvy_InternalContext_deregister_table__impl, 2},
{"savvy_InternalContext_new__impl",
(DL_FUNC)&savvy_InternalContext_new__impl, 0},
{"savvy_InternalContext_read_parquet__impl",
(DL_FUNC)&savvy_InternalContext_read_parquet__impl, 2},
+ {"savvy_InternalContext_register_scalar_udf__impl",
+ (DL_FUNC)&savvy_InternalContext_register_scalar_udf__impl, 2},
+ {"savvy_InternalContext_scalar_udf_xptr__impl",
+ (DL_FUNC)&savvy_InternalContext_scalar_udf_xptr__impl, 2},
{"savvy_InternalContext_sql__impl",
(DL_FUNC)&savvy_InternalContext_sql__impl, 2},
{"savvy_InternalContext_view__impl",
@@ -208,9 +241,11 @@ static const R_CallMethodDef CallEntries[] = {
{"savvy_InternalDataFrame_to_arrow_schema__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2},
{"savvy_InternalDataFrame_to_arrow_stream__impl",
- (DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2},
+ (DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 3},
{"savvy_InternalDataFrame_to_parquet__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8},
+ {"savvy_InternalDataFrame_to_provider__impl",
+ (DL_FUNC)&savvy_InternalDataFrame_to_provider__impl, 1},
{"savvy_InternalDataFrame_to_view__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4},
{NULL, NULL, 0}};
diff --git a/r/sedonadb/src/rust/Cargo.toml b/r/sedonadb/src/rust/Cargo.toml
index 98dc548..ced8c24 100644
--- a/r/sedonadb/src/rust/Cargo.toml
+++ b/r/sedonadb/src/rust/Cargo.toml
@@ -29,6 +29,7 @@ arrow-array = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
+datafusion-ffi = { workspace = true }
savvy = "*"
savvy-ffi = "*"
sedona = { path = "../../../../rust/sedona" }
diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h
index e3ba8a1..268c046 100644
--- a/r/sedonadb/src/rust/api.h
+++ b/r/sedonadb/src/rust/api.h
@@ -26,10 +26,15 @@ SEXP savvy_sedonadb_adbc_init_func__ffi(void);
// methods and associated functions for InternalContext
SEXP savvy_InternalContext_data_frame_from_array_stream__ffi(
SEXP self__, SEXP c_arg__stream_xptr, SEXP c_arg__collect_now);
+SEXP savvy_InternalContext_data_frame_from_table_provider__ffi(
+ SEXP self__, SEXP c_arg__provider_xptr);
SEXP savvy_InternalContext_deregister_table__ffi(SEXP self__,
SEXP c_arg__table_ref);
SEXP savvy_InternalContext_new__ffi(void);
SEXP savvy_InternalContext_read_parquet__ffi(SEXP self__, SEXP c_arg__paths);
+SEXP savvy_InternalContext_register_scalar_udf__ffi(
+ SEXP self__, SEXP c_arg__scalar_udf_xptr);
+SEXP savvy_InternalContext_scalar_udf_xptr__ffi(SEXP self__, SEXP c_arg__name);
SEXP savvy_InternalContext_sql__ffi(SEXP self__, SEXP c_arg__query);
SEXP savvy_InternalContext_view__ffi(SEXP self__, SEXP c_arg__table_ref);
@@ -43,11 +48,13 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP
c_arg__ctx,
SEXP c_arg__width_chars,
SEXP c_arg__ascii, SEXP c_arg__limit);
SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP
c_arg__out);
-SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP
c_arg__out);
+SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(
+ SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr);
SEXP savvy_InternalDataFrame_to_parquet__ffi(
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
+SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__);
SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__table_ref,
SEXP c_arg__overwrite);
diff --git a/r/sedonadb/src/rust/src/context.rs
b/r/sedonadb/src/rust/src/context.rs
index 67f52a3..a8c2c0e 100644
--- a/r/sedonadb/src/rust/src/context.rs
+++ b/r/sedonadb/src/rust/src/context.rs
@@ -16,13 +16,11 @@
// under the License.
use std::sync::Arc;
-use arrow_array::{
- ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
- RecordBatchReader,
-};
+use arrow_array::RecordBatchReader;
use arrow_schema::ArrowError;
use datafusion::catalog::{MemTable, TableProvider};
-use savvy::{savvy, savvy_err, Result};
+use datafusion_ffi::udf::FFI_ScalarUDF;
+use savvy::{savvy, savvy_err, IntoExtPtrSexp, Result};
use sedona::{context::SedonaContext,
record_batch_reader_provider::RecordBatchReaderProvider};
use sedona_geoparquet::provider::GeoParquetReadOptions;
@@ -30,6 +28,7 @@ use tokio::runtime::Runtime;
use crate::{
dataframe::{new_data_frame, InternalDataFrame},
+ ffi::{import_array_stream, import_scalar_udf, import_table_provider,
FFIScalarUdfR},
runtime::wait_for_future_captured_r,
};
@@ -94,14 +93,7 @@ impl InternalContext {
stream_xptr: savvy::Sexp,
collect_now: bool,
) -> savvy::Result<InternalDataFrame> {
- let ffi_stream =
- unsafe { savvy_ffi::R_ExternalPtrAddr(stream_xptr.0) as *mut
FFI_ArrowArrayStream };
- if ffi_stream.is_null() {
- return Err(savvy_err!("external pointer to null in
to_arrow_schema()"));
- }
-
- let stream = unsafe { FFI_ArrowArrayStream::from_raw(ffi_stream as _)
};
- let stream_reader = ArrowArrayStreamReader::try_new(stream)?;
+ let stream_reader = import_array_stream(stream_xptr)?;
// Some readers are sensitive to being collected on the R thread or
not, so
// provide the option to collect everything immediately.
@@ -117,8 +109,37 @@ impl InternalContext {
Ok(new_data_frame(inner, self.runtime.clone()))
}
+ pub fn data_frame_from_table_provider(
+ &self,
+ provider_xptr: savvy::Sexp,
+ ) -> Result<InternalDataFrame> {
+ let provider = import_table_provider(provider_xptr)?;
+ let inner = self.inner.ctx.read_table(provider)?;
+ Ok(new_data_frame(inner, self.runtime.clone()))
+ }
+
pub fn deregister_table(&self, table_ref: &str) -> savvy::Result<()> {
self.inner.ctx.deregister_table(table_ref)?;
Ok(())
}
+
+ pub fn scalar_udf_xptr(&self, name: &str) -> savvy::Result<savvy::Sexp> {
+ if let Some(udf) = self.inner.ctx.state().scalar_functions().get(name)
{
+ let ffi_scalar_udf: FFI_ScalarUDF = udf.clone().into();
+ let mut ffi_xptr =
FFIScalarUdfR(ffi_scalar_udf).into_external_pointer();
+ unsafe { savvy_ffi::Rf_protect(ffi_xptr.0) };
+ ffi_xptr.set_class(vec!["datafusion_scalar_udf"])?;
+ unsafe { savvy_ffi::Rf_unprotect(1) };
+
+ Ok(ffi_xptr)
+ } else {
+ Err(savvy_err!("Scalar UDF '{name}' was not found"))
+ }
+ }
+
+ pub fn register_scalar_udf(&self, scalar_udf_xptr: savvy::Sexp) ->
savvy::Result<()> {
+ let scalar_udf = import_scalar_udf(scalar_udf_xptr)?;
+ self.inner.ctx.register_udf(scalar_udf);
+ Ok(())
+ }
}
diff --git a/r/sedonadb/src/rust/src/dataframe.rs
b/r/sedonadb/src/rust/src/dataframe.rs
index 909b2d7..a28fddc 100644
--- a/r/sedonadb/src/rust/src/dataframe.rs
+++ b/r/sedonadb/src/rust/src/dataframe.rs
@@ -24,7 +24,8 @@ use datafusion::catalog::MemTable;
use datafusion::{logical_expr::SortExpr, prelude::DataFrame};
use datafusion_common::Column;
use datafusion_expr::Expr;
-use savvy::{savvy, savvy_err, Result};
+use datafusion_ffi::table_provider::FFI_TableProvider;
+use savvy::{savvy, savvy_err, IntoExtPtrSexp, Result};
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::reader::SedonaStreamReader;
use sedona::show::{DisplayMode, DisplayTableOptions};
@@ -33,6 +34,7 @@ use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;
use crate::context::InternalContext;
+use crate::ffi::{import_schema, FFITableProviderR};
use crate::runtime::wait_for_future_captured_r;
#[savvy]
@@ -86,12 +88,22 @@ impl InternalDataFrame {
Ok(())
}
- fn to_arrow_stream(&self, out: savvy::Sexp) -> Result<()> {
+ fn to_arrow_stream(&self, out: savvy::Sexp, requested_schema_xptr:
savvy::Sexp) -> Result<()> {
let out_void = unsafe { savvy_ffi::R_ExternalPtrAddr(out.0) };
if out_void.is_null() {
return Err(savvy_err!("external pointer to null in
to_arrow_stream()"));
}
+ let maybe_requested_schema = if requested_schema_xptr.is_null() {
+ None
+ } else {
+ Some(import_schema(requested_schema_xptr))
+ };
+
+ if maybe_requested_schema.is_some() {
+ return Err(savvy_err!("Requested schema is not supported"));
+ }
+
let inner = self.inner.clone();
let stream =
wait_for_future_captured_r(
@@ -109,6 +121,21 @@ impl InternalDataFrame {
Ok(())
}
+ fn to_provider(&self) -> Result<savvy::Sexp> {
+ let provider = self.inner.clone().into_view();
+ // Literal true is because the TableProvider that wraps this DataFrame
+ // can support filters being pushed down.
+ let ffi_provider =
+ FFI_TableProvider::new(provider, true,
Some(self.runtime.handle().clone()));
+
+ let mut ffi_xptr =
FFITableProviderR(ffi_provider).into_external_pointer();
+ unsafe { savvy_ffi::Rf_protect(ffi_xptr.0) };
+ ffi_xptr.set_class(vec!["datafusion_table_provider"])?;
+ unsafe { savvy_ffi::Rf_unprotect(1) };
+
+ Ok(ffi_xptr)
+ }
+
fn compute(&self, ctx: &InternalContext) -> Result<InternalDataFrame> {
let schema = self.inner.schema();
let batches =
diff --git a/r/sedonadb/src/rust/src/ffi.rs b/r/sedonadb/src/rust/src/ffi.rs
new file mode 100644
index 0000000..4275e26
--- /dev/null
+++ b/r/sedonadb/src/rust/src/ffi.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{
+ ffi::FFI_ArrowSchema,
+ ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
+};
+use arrow_schema::Schema;
+use datafusion::catalog::TableProvider;
+use datafusion_expr::ScalarUDF;
+use datafusion_ffi::{
+ table_provider::{FFI_TableProvider, ForeignTableProvider},
+ udf::{FFI_ScalarUDF, ForeignScalarUDF},
+};
+use savvy::{savvy_err, IntoExtPtrSexp};
+
+pub fn import_schema(mut xptr: savvy::Sexp) -> savvy::Result<Schema> {
+ let ffi_schema: &FFI_ArrowSchema = import_xptr(&mut xptr,
"nanoarrow_schema")?;
+ let schema = Schema::try_from(ffi_schema)?;
+ Ok(schema)
+}
+
+pub fn import_array_stream(mut xptr: savvy::Sexp) ->
savvy::Result<ArrowArrayStreamReader> {
+ let ffi_stream: &mut FFI_ArrowArrayStream = import_xptr(&mut xptr,
"nanoarrow_array_stream")?;
+ let reader = unsafe { ArrowArrayStreamReader::from_raw(ffi_stream as _)? };
+ Ok(reader)
+}
+
+pub fn import_table_provider(
+ mut provider_xptr: savvy::Sexp,
+) -> savvy::Result<Arc<dyn TableProvider>> {
+ let ffi_provider: &FFI_TableProvider =
+ import_xptr(&mut provider_xptr, "datafusion_table_provider")?;
+ let provider_impl = ForeignTableProvider::from(ffi_provider);
+ Ok(Arc::new(provider_impl))
+}
+
+pub fn import_scalar_udf(mut scalar_udf_xptr: savvy::Sexp) ->
savvy::Result<ScalarUDF> {
+ let ffi_scalar_udf_ref: &FFI_ScalarUDF =
+ import_xptr(&mut scalar_udf_xptr, "datafusion_scalar_udf")?;
+ let scalar_udf_impl = ForeignScalarUDF::try_from(ffi_scalar_udf_ref)?;
+ Ok(scalar_udf_impl.into())
+}
+
+fn import_xptr<'a, T>(xptr: &'a mut savvy::Sexp, cls: &str) ->
savvy::Result<&'a mut T> {
+ if !xptr.is_external_pointer() {
+ return Err(savvy_err!(
+ "Expected external pointer with class {cls} but got a different R
object"
+ ));
+ }
+
+ if !xptr
+ .get_class()
+ .map(|classes| classes.contains(&cls))
+ .unwrap_or(false)
+ {
+ return Err(savvy_err!(
+ "Expected external pointer of class {cls} but got external pointer
with classes {:?}",
+ xptr.get_class()
+ ));
+ }
+
+ let typed_ptr = unsafe { savvy_ffi::R_ExternalPtrAddr(xptr.0) as *mut T };
+ if let Some(type_ref) = unsafe { typed_ptr.as_mut() } {
+ Ok(type_ref)
+ } else {
+ Err(savvy_err!("external pointer with class {cls} is null"))
+ }
+}
+
+#[repr(C)]
+pub struct FFIScalarUdfR(pub FFI_ScalarUDF);
+impl IntoExtPtrSexp for FFIScalarUdfR {}
+
+#[repr(C)]
+pub struct FFITableProviderR(pub FFI_TableProvider);
+impl IntoExtPtrSexp for FFITableProviderR {}
diff --git a/r/sedonadb/src/rust/src/lib.rs b/r/sedonadb/src/rust/src/lib.rs
index 727c36b..07c6f31 100644
--- a/r/sedonadb/src/rust/src/lib.rs
+++ b/r/sedonadb/src/rust/src/lib.rs
@@ -27,6 +27,7 @@ use sedona_proj::register::{configure_global_proj_engine,
ProjCrsEngineBuilder};
mod context;
mod dataframe;
mod error;
+mod ffi;
mod runtime;
#[savvy]
diff --git a/r/sedonadb/tests/testthat/test-context.R
b/r/sedonadb/tests/testthat/test-context.R
index d0e1554..3050e09 100644
--- a/r/sedonadb/tests/testthat/test-context.R
+++ b/r/sedonadb/tests/testthat/test-context.R
@@ -40,6 +40,18 @@ test_that("views can be created and dropped", {
expect_error(sd_view("foofy"), "No table named 'foofy'")
})
+test_that("scalar udfs can be registered", {
+ udf <- ctx()$scalar_udf_xptr("st_point")
+ expect_s3_class(udf, "datafusion_scalar_udf")
+
+ sd_register_udf(udf)
+ df <- sd_sql("SELECT ST_Point(0, 1) as geom") |> sd_collect()
+ expect_identical(
+ wk::as_wkt(df$geom),
+ wk::wkt("POINT (0 1)")
+ )
+})
+
test_that("configure_proj() errors for invalid inputs", {
expect_error(
sd_configure_proj("not a preset"),
diff --git a/r/sedonadb/tests/testthat/test-dataframe.R
b/r/sedonadb/tests/testthat/test-dataframe.R
index a74eab3..0bc3c4c 100644
--- a/r/sedonadb/tests/testthat/test-dataframe.R
+++ b/r/sedonadb/tests/testthat/test-dataframe.R
@@ -53,6 +53,16 @@ test_that("dataframe can be created from nanoarrow objects",
{
expect_identical(sd_collect(df, ptype = r_df), r_df)
})
+test_that("dataframe can be created from an FFI table provider", {
+ df <- as_sedonadb_dataframe(data.frame(one = 1, two = "two"))
+ provider <- df$df$to_provider()
+ df2 <- as_sedonadb_dataframe(provider)
+ expect_identical(
+ sd_collect(df2),
+ data.frame(one = 1, two = "two")
+ )
+})
+
test_that("dataframe property accessors work", {
df <- sd_sql("SELECT ST_Point(0, 1) as pt")
expect_identical(ncol(df), 1L)
@@ -108,6 +118,11 @@ test_that("dataframe can be converted to an array stream",
{
as.data.frame(stream),
data.frame(one = 1, two = "two")
)
+
+ expect_error(
+ nanoarrow::as_nanoarrow_array_stream(df, schema = nanoarrow::na_int32()),
+ "Requested schema is not supported"
+ )
})
test_that("dataframe can be printed", {