This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 62776739 feat(python/sedonadb): Implement GDAL/OGR formats via pyogrio
(#283)
62776739 is described below
commit 627767393de7b1c041c04a0b59a0895442610a6d
Author: Dewey Dunnington <[email protected]>
AuthorDate: Sun Nov 23 21:08:03 2025 -0600
feat(python/sedonadb): Implement GDAL/OGR formats via pyogrio (#283)
Co-authored-by: Copilot <[email protected]>
---
Cargo.lock | 4 +
python/sedonadb/Cargo.toml | 2 +
python/sedonadb/python/sedonadb/context.py | 68 +++++
python/sedonadb/python/sedonadb/datasource.py | 194 +++++++++++++
python/sedonadb/src/context.rs | 21 ++
python/sedonadb/src/datasource.rs | 388 ++++++++++++++++++++++++++
python/sedonadb/src/lib.rs | 3 +
python/sedonadb/src/schema.rs | 15 +
python/sedonadb/tests/test_datasource.py | 135 +++++++++
rust/sedona-datasource/Cargo.toml | 2 +
rust/sedona-datasource/src/format.rs | 6 +-
rust/sedona-datasource/src/lib.rs | 1 +
rust/sedona-datasource/src/spec.rs | 156 ++++++++++-
rust/sedona-datasource/src/utility.rs | 145 ++++++++++
rust/sedona-expr/src/spatial_filter.rs | 96 ++++++-
rust/sedona-geometry/src/bounding_box.rs | 71 ++++-
rust/sedona-geometry/src/interval.rs | 270 +++++++++++++++++-
rust/sedona/Cargo.toml | 1 +
rust/sedona/src/context.rs | 167 ++++++++++-
19 files changed, 1719 insertions(+), 26 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 92dc407f..10f5f3b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4834,6 +4834,7 @@ dependencies = [
"parking_lot",
"rstest",
"sedona-common",
+ "sedona-datasource",
"sedona-expr",
"sedona-functions",
"sedona-geo",
@@ -4911,6 +4912,7 @@ dependencies = [
"datafusion-physical-plan",
"futures",
"object_store",
+ "regex",
"sedona-common",
"sedona-expr",
"sedona-schema",
@@ -5315,7 +5317,9 @@ dependencies = [
"pyo3",
"sedona",
"sedona-adbc",
+ "sedona-datasource",
"sedona-expr",
+ "sedona-geometry",
"sedona-geoparquet",
"sedona-proj",
"sedona-schema",
diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml
index 16c75a3d..67a21e07 100644
--- a/python/sedonadb/Cargo.toml
+++ b/python/sedonadb/Cargo.toml
@@ -42,6 +42,8 @@ futures = { workspace = true }
pyo3 = { version = "0.25.1" }
sedona = { workspace = true }
sedona-adbc = { workspace = true }
+sedona-datasource = { workspace = true }
+sedona-geometry = { workspace = true }
sedona-expr = { workspace = true }
sedona-geoparquet = { workspace = true }
sedona-schema = { workspace = true }
diff --git a/python/sedonadb/python/sedonadb/context.py
b/python/sedonadb/python/sedonadb/context.py
index f1c48273..714d7333 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -152,6 +152,74 @@ class SedonaContext:
self.options,
)
+ def read_pyogrio(
+ self,
+ table_paths: Union[str, Path, Iterable[str]],
+ options: Optional[Dict[str, Any]] = None,
+ extension: str = "",
+ ) -> DataFrame:
+ """Read spatial file formats using GDAL/OGR via pyogrio
+
+ Creates a DataFrame from one or more paths or URLs to a file supported
by
+ [pyogrio](https://pyogrio.readthedocs.io/en/latest/), which is the
same package
+ that powers `geopandas.read_file()` by default. Some common formats
that can be
+ opened using GDAL/OGR are FlatGeoBuf, GeoPackage, Shapefile, GeoJSON,
and many,
+ many more. See <https://gdal.org/en/stable/drivers/vector/index.html>
for a list
+ of available vector drivers.
+
+ Like `read_parquet()`, globs and directories can be specified in
addition to
+ individual file paths. Paths ending in `.zip` are automatically
prepended with
+ `/vsizip/` (i.e., are automatically unzipped by GDAL). HTTP(s) URLs are
+ supported via `/vsicurl/`.
+
+ Args:
+ table_paths: A str, Path, or iterable of paths containing URLs or
+ paths. Globs (i.e., `path/*.gpkg`), directories, and zipped
+ versions of otherwise readable files are supported.
+ options: An optional mapping of key/value pairs (open options)
+ passed to GDAL/OGR.
+ extension: An optional file extension (e.g., `"fgb"`) used when
+ `table_paths` specifies one or more directories or a glob
+ that does not enforce a file extension.
+
+ Examples:
+
+ >>> import geopandas
+ >>> import tempfile
+ >>> sd = sedona.db.connect()
+ >>> df = geopandas.GeoDataFrame({
+ ... "geometry": geopandas.GeoSeries.from_wkt(["POINT (0 1)"],
crs=3857)
+ ... })
+ >>>
+ >>> with tempfile.TemporaryDirectory() as td:
+ ... df.to_file(f"{td}/df.fgb")
+ ... sd.read_pyogrio(f"{td}/df.fgb").show()
+ ...
+ ┌──────────────┐
+ │ wkb_geometry │
+ │ geometry │
+ ╞══════════════╡
+ │ POINT(0 1) │
+ └──────────────┘
+
+ """
+ from sedonadb.datasource import PyogrioFormatSpec
+
+ if isinstance(table_paths, (str, Path)):
+ table_paths = [table_paths]
+
+ spec = PyogrioFormatSpec(extension)
+ if options is not None:
+ spec = spec.with_options(options)
+
+ return DataFrame(
+ self._impl,
+ self._impl.read_external_format(
+ spec, [str(path) for path in table_paths], False
+ ),
+ self.options,
+ )
+
def sql(self, sql: str) -> DataFrame:
"""Create a [DataFrame][sedonadb.dataframe.DataFrame] by executing SQL
diff --git a/python/sedonadb/python/sedonadb/datasource.py
b/python/sedonadb/python/sedonadb/datasource.py
new file mode 100644
index 00000000..ea9bdded
--- /dev/null
+++ b/python/sedonadb/python/sedonadb/datasource.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Mapping
+
+from sedonadb._lib import PyExternalFormat, PyProjectedRecordBatchReader
+
+
+class ExternalFormatSpec:
+ """Python file format specification
+
+ This class defines an abstract "file format", which maps to the DataFusion
+ concept of a `FileFormat`. This is a layer on top of the `TableProvider`
that
+ provides standard support for querying collections of files using globs
+ or directories of files with compatible schemas. This abstraction allows
for
+ basic support for pruning and partial filter pushdown (e.g., a bounding box
+ is available if one was provided in the underlying query); however, data
+ providers with more advanced features may wish to implement a
`TableProvider`
+ in Rust to take advantage of a wider range of DataFusion features.
+
+ Implementations are only required to implement `open_reader()`; however, if
+ opening a reader is expensive and there is a more efficient way to infer a
+ schema from a given source, implementers may wish to also implement
+ `infer_schema()`.
+
+ This extension point is experimental and may evolve to serve the needs of
+ various file formats.
+ """
+
+ @property
+ def extension(self):
+ """A file extension for files that match this format
+
+ If this concept is not important for this format, returns an empty
string.
+ """
+ return ""
+
+ def with_options(self, options: Mapping[str, Any]):
+ """Clone this instance and return a new instance with options applied
+
+ Apply an arbitrary set of format-defined key/value options. It is
useful
+ to raise an error in this method if an option or value will later
result
+ in an error; however, implementation may defer the error until later if
+ required by the underlying producer.
+
+ The default implementation of this method errors for any attempt to
+ pass options.
+ """
+ raise NotImplementedError(
+ f"key/value options not supported by {type(self).__name__}"
+ )
+
+ def open_reader(self, args: Any):
+ """Open an ArrowArrayStream/RecordBatchReader of batches given input
information
+
+ Note that the output stream must take into account
`args.file_projection`, if one
+ exists (`PyProjectedRecordBatchReader` may be used to ensure a set of
output
+ columns or apply an output projection on an input stream.
+
+ The internals will keep a strong (Python) reference to the returned
object
+ for as long as batches are being produced.
+
+ Args:
+ args: An object with attributes
+ - `src`: An object/file abstraction. Currently, `.to_url()` is
the best way
+ to extract the underlying URL from the source.
+ - `filter`: An object representing the filter expression that
was pushed
+ down, if one exists. Currently,
`.bounding_box(column_index)` is the only
+ way to interact with this object.
+ - `file_schema`: An optional schema. If `None`, the
implementation must
+ infer the schema.
+ - `file_projection`: An optional list of integers of the
columns of
+ `file_schema` that must be produced by this implementation
(in the
+ exact order specified).
+ - `batch_size`: An optional integer specifying the number of
rows requested
+ for each output batch.
+
+ """
+ raise NotImplementedError()
+
+ def infer_schema(self, src):
+ """Infer the output schema
+
+ Implementations can leave this unimplemented, in which case the
internals will call
+ `open_reader()` and query the provided schema without pulling any
batches.
+
+ Args:
+ src: An object/file abstraction. Currently, `.to_url()` is the
best way
+ to extract the underlying URL from the source.
+ """
+ raise NotImplementedError()
+
+ def __sedona_external_format__(self):
+ return PyExternalFormat(self)
+
+
+class PyogrioFormatSpec(ExternalFormatSpec):
+ """An `ExternalFormatSpec` implementation wrapping GDAL/OGR via pyogrio"""
+
+ def __init__(self, extension=""):
+ self._extension = extension
+ self._options = {}
+
+ def with_options(self, options):
+ cloned = type(self)(self.extension)
+ cloned._options.update(options)
+ return cloned
+
+ @property
+ def extension(self) -> str:
+ return self._extension
+
+ def open_reader(self, args):
+ import pyogrio.raw
+
+ url = args.src.to_url()
+ if url is None:
+ raise ValueError(f"Can't convert {args.src} to OGR-openable
object")
+
+ if url.startswith("http://") or url.startswith("https://"):
+ ogr_src = f"/vsicurl/{url}"
+ elif url.startswith("file://"):
+ ogr_src = url.removeprefix("file://")
+ else:
+ raise ValueError(f"Can't open {url} with OGR")
+
+ if ogr_src.endswith(".zip"):
+ ogr_src = f"/vsizip/{ogr_src}"
+
+ if args.is_projected():
+ file_columns = args.file_schema.names
+ columns = [file_columns[i] for i in args.file_projection]
+ else:
+ columns = None
+
+ batch_size = args.batch_size if args.batch_size is not None else 0
+
+ if args.filter and args.file_schema is not None:
+ geometry_column_indices = args.file_schema.geometry_column_indices
+ if len(geometry_column_indices) == 1:
+ bbox = args.filter.bounding_box(geometry_column_indices[0])
+ else:
+ bbox = None
+ else:
+ bbox = None
+
+ return PyogrioReaderShelter(
+ pyogrio.raw.ogr_open_arrow(
+ ogr_src, {}, columns=columns, batch_size=batch_size, bbox=bbox
+ ),
+ columns,
+ )
+
+
+class PyogrioReaderShelter:
+ """Python object wrapper around the context manager returned by pyogrio
+
+ The pyogrio object returned by `pyogrio.raw.ogr_open_arrow()` is a context
+ manager; however, the internals can only manage Rust object references.
+ This object ensures that the context manager is closed when the object
+ is deleted (which occurs as soon as possible when the returned reader
+ is no longer required).
+ """
+
+ def __init__(self, inner, output_names=None):
+ self._inner = inner
+ self._output_names = output_names
+ self._meta, self._reader = self._inner.__enter__()
+
+ def __del__(self):
+ self._inner.__exit__(None, None, None)
+
+ def __arrow_c_stream__(self, requested_schema=None):
+ if self._output_names is None:
+ return self._reader.__arrow_c_stream__()
+ else:
+ projected = PyProjectedRecordBatchReader(
+ self._reader, None, self._output_names
+ )
+ return projected.__arrow_c_stream__()
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 4c480484..67ad8dcc 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -23,6 +23,7 @@ use tokio::runtime::Runtime;
use crate::{
dataframe::InternalDataFrame,
+ datasource::PyExternalFormat,
error::PySedonaError,
import_from::{import_ffi_scalar_udf, import_table_provider_from_any},
runtime::wait_for_future,
@@ -107,6 +108,26 @@ impl InternalContext {
Ok(InternalDataFrame::new(df, self.runtime.clone()))
}
+ pub fn read_external_format<'py>(
+ &self,
+ py: Python<'py>,
+ format_spec: Bound<PyAny>,
+ table_paths: Vec<String>,
+ check_extension: bool,
+ ) -> Result<InternalDataFrame, PySedonaError> {
+ let spec = format_spec
+ .call_method0("__sedona_external_format__")?
+ .extract::<PyExternalFormat>()?;
+ let df = wait_for_future(
+ py,
+ &self.runtime,
+ self.inner
+ .read_external_format(Arc::new(spec), table_paths, None,
check_extension),
+ )??;
+
+ Ok(InternalDataFrame::new(df, self.runtime.clone()))
+ }
+
pub fn sql<'py>(
&self,
py: Python<'py>,
diff --git a/python/sedonadb/src/datasource.rs
b/python/sedonadb/src/datasource.rs
new file mode 100644
index 00000000..496a3000
--- /dev/null
+++ b/python/sedonadb/src/datasource.rs
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, ffi::CString, sync::Arc};
+
+use arrow_array::{ffi_stream::FFI_ArrowArrayStream, RecordBatch,
RecordBatchReader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{physical_expr::conjunction, physical_plan::PhysicalExpr};
+use datafusion_common::{DataFusionError, Result};
+use pyo3::{
+ exceptions::PyNotImplementedError, pyclass, pymethods, types::PyCapsule,
Bound, PyObject,
+ Python,
+};
+use sedona_datasource::{
+ spec::{ExternalFormatSpec, Object, OpenReaderArgs},
+ utility::ProjectedRecordBatchReader,
+};
+use sedona_expr::spatial_filter::SpatialFilter;
+use sedona_geometry::interval::IntervalTrait;
+
+use crate::{
+ error::PySedonaError,
+ import_from::{import_arrow_array_stream, import_arrow_schema},
+ schema::PySedonaSchema,
+};
+
+/// Python object that calls the methods of Python-level ExternalFormatSpec
+///
+/// The main purpose of this object is to implement [ExternalFormatSpec] such
+/// that it can be used by SedonaDB/DataFusion internals.
+#[pyclass]
+#[derive(Debug)]
+pub struct PyExternalFormat {
+ extension: String,
+ py_spec: PyObject,
+}
+
+impl Clone for PyExternalFormat {
+ fn clone(&self) -> Self {
+ Python::with_gil(|py| Self {
+ extension: self.extension.clone(),
+ py_spec: self.py_spec.clone_ref(py),
+ })
+ }
+}
+
+impl PyExternalFormat {
+ fn with_options_impl<'py>(
+ &self,
+ py: Python<'py>,
+ options: &HashMap<String, String>,
+ ) -> Result<Self, PySedonaError> {
+ let new_py_spec = self
+ .py_spec
+ .call_method(py, "with_options", (options.clone(),), None)?;
+ let new_extension = new_py_spec
+ .getattr(py, "extension")?
+ .extract::<String>(py)?;
+ Ok(Self {
+ extension: new_extension,
+ py_spec: new_py_spec,
+ })
+ }
+
+ fn infer_schema_impl<'py>(
+ &self,
+ py: Python<'py>,
+ object: &Object,
+ ) -> Result<Schema, PySedonaError> {
+ let maybe_schema = self.py_spec.call_method(
+ py,
+ "infer_schema",
+ (PyDataSourceObject {
+ inner: object.clone(),
+ },),
+ None,
+ );
+
+ match maybe_schema {
+ Ok(py_schema) => import_arrow_schema(py_schema.bind(py)),
+ Err(e) => {
+ if e.is_instance_of::<PyNotImplementedError>(py) {
+ // Fall back on the open_reader implementation, as for some
+ // external formats there is no other mechanism to infer a
schema
+ // other than to open a reader and query the schema at
that point.
+ let reader_args = OpenReaderArgs {
+ src: object.clone(),
+ batch_size: None,
+ file_schema: None,
+ file_projection: None,
+ filters: vec![],
+ };
+
+ let reader = self.open_reader_impl(py, &reader_args)?;
+ Ok(reader.schema().as_ref().clone())
+ } else {
+ Err(PySedonaError::from(e))
+ }
+ }
+ }
+ }
+
+ fn open_reader_impl<'py>(
+ &self,
+ py: Python<'py>,
+ args: &OpenReaderArgs,
+ ) -> Result<Box<dyn RecordBatchReader + Send>, PySedonaError> {
+ let reader_obj = self.py_spec.call_method(
+ py,
+ "open_reader",
+ (PyOpenReaderArgs {
+ inner: args.clone(),
+ },),
+ None,
+ )?;
+
+ let reader = import_arrow_array_stream(py, reader_obj.bind(py), None)?;
+ let wrapped_reader = WrappedRecordBatchReader {
+ inner: reader,
+ shelter: Some(reader_obj),
+ };
+ Ok(Box::new(wrapped_reader))
+ }
+}
+
+#[pymethods]
+impl PyExternalFormat {
+ #[new]
+ fn new<'py>(py: Python<'py>, py_spec: PyObject) -> Result<Self,
PySedonaError> {
+ let extension = py_spec.getattr(py,
"extension")?.extract::<String>(py)?;
+ Ok(Self { extension, py_spec })
+ }
+}
+
+#[async_trait]
+impl ExternalFormatSpec for PyExternalFormat {
+ fn extension(&self) -> &str {
+ &self.extension
+ }
+
+ fn with_options(
+ &self,
+ options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn ExternalFormatSpec>> {
+ let new_external_format = Python::with_gil(|py|
self.with_options_impl(py, options))
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ Ok(Arc::new(new_external_format))
+ }
+
+ async fn infer_schema(&self, location: &Object) -> Result<Schema> {
+ let schema = Python::with_gil(|py| self.infer_schema_impl(py,
location))
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+ Ok(schema)
+ }
+
+ async fn open_reader(
+ &self,
+ args: &OpenReaderArgs,
+ ) -> Result<Box<dyn RecordBatchReader + Send>> {
+ let reader = Python::with_gil(|py| self.open_reader_impl(py, args))
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+ Ok(reader)
+ }
+}
+
+/// Wrapper around the [Object] such that the [PyExternalFormatSpec] can pass
+/// required information into Python method calls
+///
+/// Currently this only exposes `to_url()`; however, we can and should expose
+/// the ability to read portions of files using the underlying object_store.
+#[pyclass]
+#[derive(Clone, Debug)]
+pub struct PyDataSourceObject {
+ pub inner: Object,
+}
+
+#[pymethods]
+impl PyDataSourceObject {
+ fn to_url(&self) -> Option<String> {
+ self.inner.to_url_string()
+ }
+}
+
+/// Wrapper around the [OpenReaderArgs] such that the [PyExternalFormatSpec]
can pass
+/// required information into Python method calls
+#[pyclass]
+#[derive(Clone, Debug)]
+pub struct PyOpenReaderArgs {
+ pub inner: OpenReaderArgs,
+}
+
+#[pymethods]
+impl PyOpenReaderArgs {
+ #[getter]
+ fn src(&self) -> PyDataSourceObject {
+ PyDataSourceObject {
+ inner: self.inner.src.clone(),
+ }
+ }
+
+ #[getter]
+ fn batch_size(&self) -> Option<usize> {
+ self.inner.batch_size
+ }
+
+ #[getter]
+ fn file_schema(&self) -> Option<PySedonaSchema> {
+ self.inner
+ .file_schema
+ .as_ref()
+ .map(|schema| PySedonaSchema::new(schema.as_ref().clone()))
+ }
+
+ #[getter]
+ fn file_projection(&self) -> Option<Vec<usize>> {
+ self.inner.file_projection.clone()
+ }
+
+ #[getter]
+ fn filters(&self) -> Vec<PyFilter> {
+ self.inner
+ .filters
+ .iter()
+ .map(|f| PyFilter { inner: f.clone() })
+ .collect()
+ }
+
+ #[getter]
+ fn filter(&self) -> Option<PyFilter> {
+ if self.inner.filters.is_empty() {
+ None
+ } else {
+ Some(PyFilter {
+ inner: conjunction(self.inner.filters.iter().cloned()),
+ })
+ }
+ }
+
+ fn is_projected(&self) -> Result<bool, PySedonaError> {
+ match (&self.inner.file_projection, &self.inner.file_schema) {
+ (None, None) | (None, Some(_)) => Ok(false),
+ (Some(projection), Some(schema)) => {
+ let seq_along_schema =
(0..schema.fields().len()).collect::<Vec<_>>();
+ Ok(&seq_along_schema != projection)
+ }
+ (Some(_), None) => Err(PySedonaError::SedonaPython(
+ "Can't check projection for OpenReaderArgs with no
schema".to_string(),
+ )),
+ }
+ }
+}
+
+/// Wrapper around a PhysicalExpr such that the [PyExternalFormatSpec] can pass
+/// required information into Python method calls
+///
+/// This currently only exposes `bounding_box()`, but in the future could
expose
+/// various ways to serialize the expression (SQL, DataFusion ProtoBuf,
Substrait).
+#[pyclass]
+#[derive(Debug)]
+pub struct PyFilter {
+ inner: Arc<dyn PhysicalExpr>,
+}
+
+#[pymethods]
+impl PyFilter {
+ fn bounding_box(
+ &self,
+ column_index: usize,
+ ) -> Result<Option<(f64, f64, f64, f64)>, PySedonaError> {
+ let filter = SpatialFilter::try_from_expr(&self.inner)?;
+ let filter_bbox = filter.filter_bbox(column_index);
+ if filter_bbox.x().is_full() || filter_bbox.y().is_full() {
+ Ok(None)
+ } else {
+ Ok(Some((
+ filter_bbox.x().lo(),
+ filter_bbox.y().lo(),
+ filter_bbox.x().hi(),
+ filter_bbox.y().hi(),
+ )))
+ }
+ }
+
+ fn __repr__(&self) -> String {
+ format!("{self:?}")
+ }
+}
+
+/// RecordBatchReader utility that helps ensure projected output
+///
+/// Because the output of `open_reader()` is required to take into account
+/// the projection, we need to provide a utility to ensure this is take into
account.
+/// This wrapper is a thin wrapper around the [ProjectedRecordBatchReader]
that allows
+/// it to be constructed from Python using either a set of indices or a set of
names.
+#[pyclass]
+pub struct PyProjectedRecordBatchReader {
+ inner_object: PyObject,
+ projection_indices: Option<Vec<usize>>,
+ projection_names: Option<Vec<String>>,
+}
+
+#[pymethods]
+impl PyProjectedRecordBatchReader {
+ #[new]
+ fn new(
+ inner_object: PyObject,
+ projection_indices: Option<Vec<usize>>,
+ projection_names: Option<Vec<String>>,
+ ) -> Self {
+ Self {
+ inner_object,
+ projection_indices,
+ projection_names,
+ }
+ }
+
+ #[pyo3(signature = (requested_schema=None))]
+ fn __arrow_c_stream__<'py>(
+ &self,
+ py: Python<'py>,
+ #[allow(unused_variables)] requested_schema: Option<Bound<'py,
PyCapsule>>,
+ ) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
+ let inner = import_arrow_array_stream(py, self.inner_object.bind(py),
None)?;
+
+ let reader = match (&self.projection_indices, &self.projection_names) {
+ (None, None) | (Some(_), Some(_)) => {
+ return
Err(PySedonaError::SedonaPython("PyProjectedRecordBatchReader must be specified
by one of projection_indices or projection_names".to_string()))
+ }
+ (Some(indices), None) => {
+ ProjectedRecordBatchReader::from_projection(inner,
indices.clone())?
+ }
+ (None, Some(names)) => {
+ ProjectedRecordBatchReader::from_output_names(inner,
&names.iter().map(|s| s.as_str()).collect::<Vec<&str>>())?
+ }
+ };
+
+ let ffi_stream = FFI_ArrowArrayStream::new(Box::new(reader));
+ let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
+ Ok(PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?)
+ }
+}
+
+/// Helper to ensure a Python object stays in scope for the duration of a
+/// [RecordBatchReader]'s output.
+///
+/// Some Python frameworks require that some parent object outlive a returned
+/// ArrowArrayStream/RecordBatchReader (e.g., the pyogrio context manager, or
+/// an ADBC statement/cursor).
+struct WrappedRecordBatchReader {
+ pub inner: Box<dyn RecordBatchReader + Send>,
+ pub shelter: Option<PyObject>,
+}
+
+impl RecordBatchReader for WrappedRecordBatchReader {
+ fn schema(&self) -> SchemaRef {
+ self.inner.schema()
+ }
+}
+
+impl Iterator for WrappedRecordBatchReader {
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if let Some(item) = self.inner.next() {
+ Some(item)
+ } else {
+ self.shelter = None;
+ None
+ }
+ }
+}
diff --git a/python/sedonadb/src/lib.rs b/python/sedonadb/src/lib.rs
index 62a0caba..6110144a 100644
--- a/python/sedonadb/src/lib.rs
+++ b/python/sedonadb/src/lib.rs
@@ -22,6 +22,7 @@ use std::ffi::c_void;
mod context;
mod dataframe;
+mod datasource;
mod error;
mod import_from;
mod reader;
@@ -94,6 +95,8 @@ fn _lib(py: Python<'_>, m: &Bound<'_, PyModule>) ->
PyResult<()> {
m.add_class::<context::InternalContext>()?;
m.add_class::<dataframe::InternalDataFrame>()?;
+ m.add_class::<datasource::PyExternalFormat>()?;
+ m.add_class::<datasource::PyProjectedRecordBatchReader>()?;
m.add("SedonaError", py.get_type::<error::SedonaError>())?;
m.add_class::<schema::PySedonaSchema>()?;
m.add_class::<schema::PySedonaField>()?;
diff --git a/python/sedonadb/src/schema.rs b/python/sedonadb/src/schema.rs
index d261043c..d9ead708 100644
--- a/python/sedonadb/src/schema.rs
+++ b/python/sedonadb/src/schema.rs
@@ -22,6 +22,7 @@ use pyo3::exceptions::{PyIndexError, PyKeyError, PyTypeError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use sedona_schema::datatypes::SedonaType;
+use sedona_schema::schema::SedonaSchema;
use crate::error::PySedonaError;
@@ -59,6 +60,20 @@ impl PySedonaSchema {
#[pymethods]
impl PySedonaSchema {
+ #[getter]
+ fn names(&self) -> Vec<String> {
+ self.inner
+ .fields()
+ .iter()
+ .map(|f| f.name().to_string())
+ .collect()
+ }
+
+ #[getter]
+ fn geometry_column_indices(&self) -> Result<Vec<usize>, PySedonaError> {
+ Ok(self.inner.geometry_column_indices()?)
+ }
+
fn field<'py>(
&self,
py: Python<'py>,
diff --git a/python/sedonadb/tests/test_datasource.py
b/python/sedonadb/tests/test_datasource.py
new file mode 100644
index 00000000..64e5b0b8
--- /dev/null
+++ b/python/sedonadb/tests/test_datasource.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pathlib import Path
+import tempfile
+
+import geopandas
+import geopandas.testing
+import pandas as pd
+import pytest
+import shapely
+import sedonadb
+
+
+def test_read_ogr_projection(con):
+ n = 1024
+ series = geopandas.GeoSeries.from_xy(
+ list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+ )
+ gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry":
series})
+ gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+ with tempfile.TemporaryDirectory() as td:
+ temp_fgb_path = f"{td}/temp.fgb"
+ gdf.to_file(temp_fgb_path)
+ con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True)
+
+ # With no projection
+ geopandas.testing.assert_geodataframe_equal(
+ con.sql("SELECT * FROM test_fgb ORDER BY idx").to_pandas(), gdf
+ )
+
+ # With only not geometry selected
+ pd.testing.assert_frame_equal(
+ con.sql("SELECT idx FROM test_fgb ORDER BY idx").to_pandas(),
+ gdf.filter(["idx"]),
+ )
+
+ # With reversed columns
+ pd.testing.assert_frame_equal(
+ con.sql("SELECT wkb_geometry, idx FROM test_fgb ORDER BY
idx").to_pandas(),
+ gdf.filter(["wkb_geometry", "idx"]),
+ )
+
+
+def test_read_ogr_multi_file(con):
+ n = 1024 * 16
+ partitions = ["part_{c}" for c in "abcdefghijklmnop"]
+ series = geopandas.GeoSeries.from_xy(
+ list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+ )
+ gdf = geopandas.GeoDataFrame(
+ {
+ "idx": list(range(n)),
+ "partition": [partitions[i % len(partitions)] for i in range(n)],
+ "wkb_geometry": series,
+ }
+ )
+ gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+ with tempfile.TemporaryDirectory() as td:
+ # Create partitioned files by writing Parquet first and translating
+ # one file at a time
+ con.create_data_frame(gdf).to_parquet(td, partition_by="partition")
+ for parquet_path in Path(td).rglob("*.parquet"):
+ fgb_path = str(parquet_path).replace(".parquet", ".fgb")
+ con.read_parquet(parquet_path).to_pandas().to_file(fgb_path)
+
+ # Reading a directory while specifying the extension should work
+ con.read_pyogrio(f"{td}", extension="fgb").to_view(
+ "gdf_from_dir", overwrite=True
+ )
+ geopandas.testing.assert_geodataframe_equal(
+ con.sql("SELECT * FROM gdf_from_dir ORDER BY idx").to_pandas(),
+ gdf.filter(["idx", "wkb_geometry"]),
+ )
+
+ # Reading using a glob without specifying the extension should work
+ con.read_pyogrio(f"{td}/**/*.fgb").to_view("gdf_from_glob",
overwrite=True)
+ geopandas.testing.assert_geodataframe_equal(
+ con.sql("SELECT * FROM gdf_from_glob ORDER BY idx").to_pandas(),
+ gdf.filter(["idx", "wkb_geometry"]),
+ )
+
+
+def test_read_ogr_filter(con):
+ n = 1024
+ series = geopandas.GeoSeries.from_xy(
+ list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+ )
+ gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry":
series})
+ gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+ with tempfile.TemporaryDirectory() as td:
+ temp_fgb_path = f"{td}/temp.fgb"
+ gdf.to_file(temp_fgb_path)
+ con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True)
+
+ # With something that should trigger a bounding box filter
+ geopandas.testing.assert_geodataframe_equal(
+ con.sql(
+ """
+ SELECT * FROM test_fgb
+ WHERE ST_Equals(wkb_geometry, ST_SetSRID(ST_Point(1, 2), 3857))
+ """
+ ).to_pandas(),
+ gdf[gdf.geometry.geom_equals(shapely.Point(1,
2))].reset_index(drop=True),
+ )
+
+
+def test_read_ogr_file_not_found(con):
+ with pytest.raises(
+ sedonadb._lib.SedonaError, match="Can't infer schema for zero objects"
+ ):
+ con.read_pyogrio("this/is/not/a/directory")
+
+ with tempfile.TemporaryDirectory() as td:
+ with pytest.raises(
+ sedonadb._lib.SedonaError, match="Can't infer schema for zero
objects"
+ ):
+ con.read_pyogrio(Path(td) / "file_does_not_exist")
diff --git a/rust/sedona-datasource/Cargo.toml
b/rust/sedona-datasource/Cargo.toml
index b939ff41..c7273de8 100644
--- a/rust/sedona-datasource/Cargo.toml
+++ b/rust/sedona-datasource/Cargo.toml
@@ -29,6 +29,7 @@ rust-version.workspace = true
default = []
[dev-dependencies]
+object_store = { workspace = true, features = ["http"] }
url = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
@@ -45,6 +46,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
+regex = { workspace = true }
sedona-common = { workspace = true }
sedona-expr = { workspace = true }
sedona-schema = { workspace = true }
diff --git a/rust/sedona-datasource/src/format.rs
b/rust/sedona-datasource/src/format.rs
index 5e6d5315..69a2bc9f 100644
--- a/rust/sedona-datasource/src/format.rs
+++ b/rust/sedona-datasource/src/format.rs
@@ -31,7 +31,7 @@ use datafusion::{
},
};
use datafusion_catalog::{memory::DataSourceExec, Session};
-use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result,
Statistics};
+use datafusion_common::{not_impl_err, plan_err, DataFusionError, GetExt,
Result, Statistics};
use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr};
use datafusion_physical_plan::{
filter_pushdown::{FilterPushdownPropagation, PushedDown},
@@ -124,6 +124,10 @@ impl FileFormat for ExternalFileFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
+ if objects.is_empty() {
+ return plan_err!("Can't infer schema for zero objects. Does the
input path exist?");
+ }
+
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| async move {
let schema = self
diff --git a/rust/sedona-datasource/src/lib.rs
b/rust/sedona-datasource/src/lib.rs
index 4bdc5969..88ccd95e 100644
--- a/rust/sedona-datasource/src/lib.rs
+++ b/rust/sedona-datasource/src/lib.rs
@@ -18,3 +18,4 @@
pub mod format;
pub mod provider;
pub mod spec;
+pub mod utility;
diff --git a/rust/sedona-datasource/src/spec.rs
b/rust/sedona-datasource/src/spec.rs
index a3abf8d6..d9b8f1af 100644
--- a/rust/sedona-datasource/src/spec.rs
+++ b/rust/sedona-datasource/src/spec.rs
@@ -15,7 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::{collections::HashMap, fmt::Debug, sync::Arc};
+use std::{
+ collections::HashMap,
+ fmt::{Debug, Display},
+ sync::Arc,
+};
use arrow_array::RecordBatchReader;
use arrow_schema::{Schema, SchemaRef};
@@ -26,6 +30,7 @@ use datafusion_common::{Result, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};
+use regex::Regex;
/// Simple file format specification
///
@@ -51,11 +56,6 @@ pub trait ExternalFormatSpec: Debug + Send + Sync {
async fn open_reader(&self, args: &OpenReaderArgs)
-> Result<Box<dyn RecordBatchReader + Send>>;
- /// A file extension or `""` if this concept does not apply
- fn extension(&self) -> &str {
- ""
- }
-
/// Compute a clone of self but with the key/value options specified
///
/// Implementations should error for invalid key/value input that does
@@ -65,6 +65,11 @@ pub trait ExternalFormatSpec: Debug + Send + Sync {
options: &HashMap<String, String>,
) -> Result<Arc<dyn ExternalFormatSpec>>;
+ /// A file extension or `""` if this concept does not apply
+ fn extension(&self) -> &str {
+ ""
+ }
+
/// Fill in default options from [TableOptions]
///
/// The TableOptions are a DataFusion concept that provide a means by which
@@ -182,16 +187,145 @@ impl Object {
// GDAL to be able to translate.
let object_store_debug = format!("{:?}",
self.store).to_lowercase();
if object_store_debug.contains("http") {
- Some(format!("https://{}", meta.location))
- } else if object_store_debug.contains("local") {
+ let pattern = r#"host:
some\(domain\("([A-Za-z0-9.-]+)"\)\)"#;
+ let re = Regex::new(pattern).ok()?;
+ if let Some(caps) = re.captures(&object_store_debug) {
+ caps.get(1)
+ .map(|host| format!("https://{}/{}",
host.as_str(), meta.location))
+ } else {
+ None
+ }
+ } else if object_store_debug.contains("localfilesystem") {
Some(format!("file:///{}", meta.location))
} else {
None
}
}
- (Some(url), None) => Some(url.to_string()),
- (Some(url), Some(meta)) => Some(format!("{url}/{}",
meta.location)),
- (None, None) => None,
+ (Some(url), Some(meta)) => Some(format!("{url}{}", meta.location)),
+ (Some(_), None) | (None, None) => None,
+ }
+ }
+}
+
+impl Display for Object {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ if let Some(url) = self.to_url_string() {
+ write!(f, "{url}")
+ } else if let Some(meta) = &self.meta {
+ write!(f, "<object store> {}", meta.location)
+ } else {
+ write!(f, "<object store> <unknown location>")
}
}
}
+
+#[cfg(test)]
+mod test {
+ use object_store::{http::HttpBuilder, local::LocalFileSystem};
+
+ use super::*;
+
+ #[test]
+ fn http_object() {
+ let url_string = "https://foofy.foof/path/to/file.ext";
+
+ let store =
Arc::new(HttpBuilder::new().with_url(url_string).build().unwrap());
+
+ let url = ObjectStoreUrl::parse("https://foofy.foof").unwrap();
+
+ let meta = ObjectMeta {
+ location: "/path/to/file.ext".into(),
+ last_modified: Default::default(),
+ size: 0,
+ e_tag: None,
+ version: None,
+ };
+
+ // Should be able to reconstruct the url with ObjectStoreUrl + meta
+ let obj = Object {
+ store: None,
+ url: Some(url.clone()),
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert_eq!(obj.to_url_string().unwrap(), url_string);
+
+ // Should be able to reconstruct the url with the ObjectStore + meta
+ let obj = Object {
+ store: Some(store),
+ url: None,
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert_eq!(obj.to_url_string().unwrap(), url_string);
+
+ // With only Meta, this should fail to compute a url
+ let obj = Object {
+ store: None,
+ url: None,
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert!(obj.to_url_string().is_none());
+
+ // With only ObjectStoreUrl, this should fail to compute a url
+ let obj = Object {
+ store: None,
+ url: Some(url),
+ meta: None,
+ range: None,
+ };
+ assert!(obj.to_url_string().is_none());
+ }
+
+ #[test]
+ fn filesystem_object() {
+ let store = Arc::new(LocalFileSystem::new());
+
+ let url = ObjectStoreUrl::parse("file://").unwrap();
+
+ let meta = ObjectMeta {
+ location: "/path/to/file.ext".into(),
+ last_modified: Default::default(),
+ size: 0,
+ e_tag: None,
+ version: None,
+ };
+
+ // Should be able to reconstruct the url with ObjectStoreUrl + meta
+ let obj = Object {
+ store: None,
+ url: Some(url.clone()),
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext");
+
+ // Should be able to reconstruct the url with the ObjectStore + meta
+ let obj = Object {
+ store: Some(store),
+ url: None,
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext");
+
+ // With only Meta, this should fail to compute a url
+ let obj = Object {
+ store: None,
+ url: None,
+ meta: Some(meta.clone()),
+ range: None,
+ };
+ assert!(obj.to_url_string().is_none());
+
+ // With only ObjectStoreUrl, this should fail to compute a url
+ let obj = Object {
+ store: None,
+ url: Some(url),
+ meta: None,
+ range: None,
+ };
+ assert!(obj.to_url_string().is_none());
+ }
+}
diff --git a/rust/sedona-datasource/src/utility.rs
b/rust/sedona-datasource/src/utility.rs
new file mode 100644
index 00000000..ad36cf0c
--- /dev/null
+++ b/rust/sedona-datasource/src/utility.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, SchemaRef};
+use datafusion_common::Result;
+
+/// [RecordBatchReader] wrapper that applies a projection
+///
+/// This utility can be used to implement a reader that conforms to the
+/// DataFusion requirement that datasources apply the specified projection
+/// when producing output.
+pub struct ProjectedRecordBatchReader {
+ inner: Box<dyn RecordBatchReader + Send>,
+ projection: Vec<usize>,
+ schema: SchemaRef,
+}
+
+impl ProjectedRecordBatchReader {
+ /// Create a new wrapper from the indices into the input desired in the
output
+ pub fn from_projection(
+ inner: Box<dyn RecordBatchReader + Send>,
+ projection: Vec<usize>,
+ ) -> Result<Self> {
+ let schema = inner.schema().project(&projection)?;
+ Ok(Self {
+ inner,
+ projection,
+ schema: Arc::new(schema),
+ })
+ }
+
+ /// Create a new wrapper from the column names from the input desired in
the output
+ pub fn from_output_names(
+ inner: Box<dyn RecordBatchReader + Send>,
+ projection: &[&str],
+ ) -> Result<Self> {
+ let input_indices = projection
+ .iter()
+ .map(|col| inner.schema().index_of(col))
+ .collect::<Result<Vec<usize>, ArrowError>>()?;
+ Self::from_projection(inner, input_indices)
+ }
+}
+
+impl RecordBatchReader for ProjectedRecordBatchReader {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Iterator for ProjectedRecordBatchReader {
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if let Some(next) = self.inner.next() {
+ match next {
+ Ok(batch) => Some(batch.project(&self.projection)),
+ Err(err) => Some(Err(err)),
+ }
+ } else {
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+
+ use arrow_array::{create_array, ArrayRef, RecordBatchIterator};
+ use datafusion::assert_batches_eq;
+
+ use super::*;
+
+ #[test]
+ fn projected_record_batch_reader() {
+ let batch = RecordBatch::try_from_iter([
+ (
+ "x",
+ create_array!(Utf8, ["one", "two", "three", "four"]) as
ArrayRef,
+ ),
+ (
+ "y",
+ create_array!(Utf8, ["five", "six", "seven", "eight"]) as
ArrayRef,
+ ),
+ ])
+ .unwrap();
+
+ let schema = batch.schema();
+
+ // From indices
+ let reader = RecordBatchIterator::new([Ok(batch.clone())],
schema.clone());
+ let projected =
+ ProjectedRecordBatchReader::from_projection(Box::new(reader),
vec![1, 0]).unwrap();
+ let projected_batches = projected.collect::<Result<Vec<_>,
ArrowError>>().unwrap();
+ assert_batches_eq!(
+ [
+ "+-------+-------+",
+ "| y | x |",
+ "+-------+-------+",
+ "| five | one |",
+ "| six | two |",
+ "| seven | three |",
+ "| eight | four |",
+ "+-------+-------+",
+ ],
+ &projected_batches
+ );
+
+ // From output names
+ let reader = RecordBatchIterator::new([Ok(batch.clone())],
schema.clone());
+ let projected =
+ ProjectedRecordBatchReader::from_output_names(Box::new(reader),
&["y", "x"]).unwrap();
+ let projected_batches = projected.collect::<Result<Vec<_>,
ArrowError>>().unwrap();
+ assert_batches_eq!(
+ [
+ "+-------+-------+",
+ "| y | x |",
+ "+-------+-------+",
+ "| five | one |",
+ "| six | two |",
+ "| seven | three |",
+ "| eight | four |",
+ "+-------+-------+",
+ ],
+ &projected_batches
+ );
+ }
+}
diff --git a/rust/sedona-expr/src/spatial_filter.rs
b/rust/sedona-expr/src/spatial_filter.rs
index 83e314eb..1160cf2c 100644
--- a/rust/sedona-expr/src/spatial_filter.rs
+++ b/rust/sedona-expr/src/spatial_filter.rs
@@ -25,7 +25,11 @@ use datafusion_physical_expr::{
};
use geo_traits::Dimensions;
use sedona_common::sedona_internal_err;
-use sedona_geometry::{bounding_box::BoundingBox, bounds::wkb_bounds_xy,
interval::IntervalTrait};
+use sedona_geometry::{
+ bounding_box::BoundingBox,
+ bounds::wkb_bounds_xy,
+ interval::{Interval, IntervalTrait},
+};
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
use crate::{
@@ -41,7 +45,7 @@ use crate::{
/// to attempt pruning unnecessary files or parts of files specifically with
respect
/// to a spatial filter (i.e., non-spatial filters we leave to an underlying
/// implementation).
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum SpatialFilter {
/// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>,
\<column\>)
Intersects(Column, BoundingBox),
@@ -60,6 +64,44 @@ pub enum SpatialFilter {
}
impl SpatialFilter {
+ /// Compute the maximum extent of a filter for a specific column index
+ ///
+ /// Some spatial file formats have the ability to push down a bounding box
+ /// into an index. This function allows deriving that bounding box based
+ /// on what DataFusion provides, which is a physical expression.
+ ///
+ /// Note that this always succeeds; however, for a non-spatial expression
or
+ /// a non-spatial expression that is unsupported, the full bounding box is
+ /// returned.
+ pub fn filter_bbox(&self, column_index: usize) -> BoundingBox {
+ match self {
+ SpatialFilter::Intersects(column, bounding_box)
+ | SpatialFilter::Covers(column, bounding_box) => {
+ if column.index() == column_index {
+ return bounding_box.clone();
+ }
+ }
+ SpatialFilter::And(lhs, rhs) => {
+ let lhs_box = lhs.filter_bbox(column_index);
+ let rhs_box = rhs.filter_bbox(column_index);
+ if let Ok(bounds) = lhs_box.intersection(&rhs_box) {
+ return bounds;
+ }
+ }
+ SpatialFilter::Or(lhs, rhs) => {
+ let mut bounds = lhs.filter_bbox(column_index);
+ bounds.update_box(&rhs.filter_bbox(column_index));
+ return bounds;
+ }
+ SpatialFilter::LiteralFalse => {
+ return BoundingBox::xy(Interval::empty(), Interval::empty())
+ }
+ SpatialFilter::HasZ(_) | SpatialFilter::Unknown => {}
+ }
+
+ BoundingBox::xy(Interval::full(), Interval::full())
+ }
+
/// Returns true if there is any chance the expression might be true
///
/// In other words, returns false if and only if the expression is
guaranteed
@@ -1103,4 +1145,54 @@ mod test {
panic!("Parse incorrect!")
}
}
+
+ #[test]
+ fn bounding_box() {
+ let col_zero = Column::new("foofy", 0);
+ let bbox_02 = BoundingBox::xy((0, 2), (0, 2));
+ let bbox_13 = BoundingBox::xy((1, 3), (1, 3));
+
+ assert_eq!(
+ SpatialFilter::Intersects(col_zero.clone(),
bbox_02.clone()).filter_bbox(0),
+ bbox_02
+ );
+
+ assert_eq!(
+ SpatialFilter::Covers(col_zero.clone(),
bbox_02.clone()).filter_bbox(0),
+ bbox_02
+ );
+
+ assert_eq!(
+ SpatialFilter::LiteralFalse.filter_bbox(0),
+ BoundingBox::xy(Interval::empty(), Interval::empty())
+ );
+ assert_eq!(
+ SpatialFilter::HasZ(col_zero.clone()).filter_bbox(0),
+ BoundingBox::xy(Interval::full(), Interval::full())
+ );
+ assert_eq!(
+ SpatialFilter::Unknown.filter_bbox(0),
+ BoundingBox::xy(Interval::full(), Interval::full())
+ );
+
+ let intersects_02 = SpatialFilter::Intersects(col_zero.clone(),
bbox_02.clone());
+ let intersects_13 = SpatialFilter::Intersects(col_zero.clone(),
bbox_13.clone());
+ assert_eq!(
+ SpatialFilter::And(
+ Box::new(intersects_02.clone()),
+ Box::new(intersects_13.clone())
+ )
+ .filter_bbox(0),
+ BoundingBox::xy((1, 2), (1, 2))
+ );
+
+ assert_eq!(
+ SpatialFilter::Or(
+ Box::new(intersects_02.clone()),
+ Box::new(intersects_13.clone())
+ )
+ .filter_bbox(0),
+ BoundingBox::xy((0, 3), (0, 3))
+ );
+ }
}
diff --git a/rust/sedona-geometry/src/bounding_box.rs
b/rust/sedona-geometry/src/bounding_box.rs
index fb5b9777..7a018fb5 100644
--- a/rust/sedona-geometry/src/bounding_box.rs
+++ b/rust/sedona-geometry/src/bounding_box.rs
@@ -16,7 +16,10 @@
// under the License.
use serde::{Deserialize, Serialize};
-use crate::interval::{Interval, IntervalTrait, WraparoundInterval};
+use crate::{
+ error::SedonaGeometryError,
+ interval::{Interval, IntervalTrait, WraparoundInterval},
+};
/// Bounding Box implementation with wraparound support
///
@@ -162,6 +165,25 @@ impl BoundingBox {
_ => None,
};
}
+
+ /// Compute the intersection of this bounding box with another
+ ///
+ /// This method will propagate missingness of Z or M dimensions from the
two boxes
+ /// (e.g., Z will be `None` if Z if `self.z().is_none()` OR
`other.z().is_none()`).
+ pub fn intersection(&self, other: &Self) -> Result<Self,
SedonaGeometryError> {
+ Ok(Self {
+ x: self.x.intersection(&other.x)?,
+ y: self.y.intersection(&other.y)?,
+ z: match (self.z, other.z) {
+ (Some(z), Some(other_z)) => Some(z.intersection(&other_z)?),
+ _ => None,
+ },
+ m: match (self.m, other.m) {
+ (Some(m), Some(other_m)) => Some(m.intersection(&other_m)?),
+ _ => None,
+ },
+ })
+ }
}
#[cfg(test)]
@@ -358,6 +380,53 @@ mod test {
assert!(bounding_box.m().is_none());
}
+ #[test]
+ fn bounding_box_intersection() {
+ assert_eq!(
+ BoundingBox::xy((1, 2), (3, 4))
+ .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5)))
+ .unwrap(),
+ BoundingBox::xy((1.5, 2.0), (3.5, 4.0))
+ );
+
+ // If z and m are present in one input but not the other, we propagate
the unknownness
+ // to the intersection
+ assert_eq!(
+ BoundingBox::xyzm(
+ (1, 2),
+ (3, 4),
+ Some(Interval::empty()),
+ Some(Interval::empty())
+ )
+ .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5)))
+ .unwrap(),
+ BoundingBox::xy((1.5, 2.0), (3.5, 4.0))
+ );
+
+ // If z and m are specified in both, we include the intersection in
the output
+ assert_eq!(
+ BoundingBox::xyzm(
+ (1, 2),
+ (3, 4),
+ Some(Interval::empty()),
+ Some(Interval::empty())
+ )
+ .intersection(&BoundingBox::xyzm(
+ (1.5, 2.5),
+ (3.5, 4.5),
+ Some(Interval::empty()),
+ Some(Interval::empty())
+ ))
+ .unwrap(),
+ BoundingBox::xyzm(
+ (1.5, 2.0),
+ (3.5, 4.0),
+ Some(Interval::empty()),
+ Some(Interval::empty())
+ )
+ );
+ }
+
fn check_serialize_deserialize_roundtrip(bounding_box: BoundingBox) {
let json_bytes = serde_json::to_vec(&bounding_box).unwrap();
let bounding_box_roundtrip: BoundingBox =
serde_json::from_slice(&json_bytes).unwrap();
diff --git a/rust/sedona-geometry/src/interval.rs
b/rust/sedona-geometry/src/interval.rs
index 1037bf7e..d02608d7 100644
--- a/rust/sedona-geometry/src/interval.rs
+++ b/rust/sedona-geometry/src/interval.rs
@@ -26,7 +26,7 @@ use crate::error::SedonaGeometryError;
/// incurs overhead (particularly in a loop). This trait is mostly used to
/// simplify testing and unify documentation for the two concrete
/// implementations.
-pub trait IntervalTrait: std::fmt::Debug + PartialEq {
+pub trait IntervalTrait: std::fmt::Debug + PartialEq + Sized {
/// Create an interval from lo and hi values
fn new(lo: f64, hi: f64) -> Self;
@@ -98,6 +98,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq {
/// True if this interval is empty (i.e. intersects no values)
fn is_empty(&self) -> bool;
+ /// True if this interval is full (i.e. intersects all values)
+ fn is_full(&self) -> bool;
+
/// Compute a new interval that is the union of both
///
/// When accumulating intervals in a loop, use [Interval::update_interval].
@@ -114,6 +117,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq {
/// For regular intervals, this expands both lo and hi by the distance.
/// For wraparound intervals, this may result in the full interval if
expansion is large enough.
fn expand_by(&self, distance: f64) -> Self;
+
+ /// Compute the interval contained by both self and other
+ fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError>;
}
/// 1D Interval that never wraps around
@@ -236,6 +242,10 @@ impl IntervalTrait for Interval {
self.width() == -f64::INFINITY
}
+ fn is_full(&self) -> bool {
+ self == &Self::full()
+ }
+
fn merge_interval(&self, other: &Self) -> Self {
let mut out = *self;
out.update_interval(other);
@@ -255,6 +265,16 @@ impl IntervalTrait for Interval {
Self::new(self.lo - distance, self.hi + distance)
}
+
+ fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError> {
+ let new_lo = self.lo.max(other.lo);
+ let new_hi = self.hi.min(other.hi);
+ if new_lo > new_hi {
+ Ok(Self::empty())
+ } else {
+ Ok(Self::new(new_lo, new_hi))
+ }
+ }
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
@@ -370,6 +390,10 @@ impl IntervalTrait for WraparoundInterval {
self.inner.is_empty()
}
+ fn is_full(&self) -> bool {
+ self == &Self::full()
+ }
+
fn merge_interval(&self, other: &Self) -> Self {
if self.is_empty() {
return *other;
@@ -486,6 +510,53 @@ impl IntervalTrait for WraparoundInterval {
// So the interval itself is (excluded_hi, excluded_lo)
Self::new(excluded_hi, excluded_lo)
}
+
+ fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError> {
+ match (self.is_wraparound(), other.is_wraparound()) {
+ // Neither is wraparound
+ (false, false) =>
Ok(self.inner.intersection(&other.inner)?.into()),
+ // One is wraparound
+ (true, false) => other.intersection(self),
+ (false, true) => {
+ let inner = self.inner;
+ let (left, right) = other.split();
+ match (inner.intersects_interval(&left),
inner.intersects_interval(&right)) {
+ // Intersects both the left and right intervals
+ (true, true) => {
+ Err(SedonaGeometryError::Invalid(format!("Can't
represent the intersection of {self:?} and {other:?} as a single
WraparoundInterval")))
+ },
+ // Intersects only the left interval
+ (true, false) => Ok(inner.intersection(&left)?.into()),
+ // Intersects only the right interval
+ (false, true) => Ok(inner.intersection(&right)?.into()),
+ (false, false) => Ok(WraparoundInterval::empty()),
+ }
+ }
+ // Both are wraparound
+ (true, true) => {
+ // Both wraparound intervals represent complements of excluded
regions
+ // Intersection of complements = complement of union of
excluded regions
+ // self excludes (hi, lo), other excludes (other.hi, other.lo)
+ // We need to find the union of these excluded regions
+ let excluded_self = Interval::new(self.inner.hi,
self.inner.lo);
+ let excluded_other = Interval::new(other.inner.hi,
other.inner.lo);
+
+ // We can't use the excluded union if the excluded region of
self and other
+ // are disjoint
+ if excluded_self.intersects_interval(&excluded_other) {
+ let excluded_union =
excluded_self.merge_interval(&excluded_other);
+
+ // The intersection is the complement of the union of
excluded regions
+ Ok(WraparoundInterval::new(
+ excluded_union.hi(),
+ excluded_union.lo(),
+ ))
+ } else {
+ Err(SedonaGeometryError::Invalid(format!("Can't represent
the intersection of {self:?} and {other:?} as a single WraparoundInterval")))
+ }
+ }
+ }
+ }
}
#[cfg(test)]
@@ -495,6 +566,8 @@ mod test {
use super::*;
fn test_empty<T: IntervalTrait>(empty: T) {
+ assert!(empty.is_empty());
+
// Equals itself
#[allow(clippy::eq_op)]
{
@@ -536,6 +609,12 @@ mod test {
T::new(10.0, 20.0)
);
+ // Intersecting an empty interval results in an empty interval
+ assert_eq!(empty.intersection(&empty).unwrap(), empty);
+
+ // Intersecting a full interval results in an empty interval
+ assert_eq!(empty.intersection(&T::full()).unwrap(), empty);
+
// Expanding empty interval keeps it empty
assert_eq!(empty.expand_by(5.0), empty);
assert_eq!(empty.expand_by(0.0), empty);
@@ -567,6 +646,21 @@ mod test {
);
}
+ fn test_full<T: IntervalTrait>(full: T) {
+ assert!(full.is_full());
+ assert_eq!(full.intersection(&full).unwrap(), full);
+ }
+
+ #[test]
+ fn interval_full() {
+ test_full(Interval::full());
+ }
+
+ #[test]
+ fn wraparound_interval_full() {
+ test_full(WraparoundInterval::full());
+ }
+
fn test_finite<T: IntervalTrait>(finite: T) {
// Check accessors
assert_eq!(finite.lo(), 10.0);
@@ -671,6 +765,28 @@ mod test {
T::new(10.0, 30.0)
);
+ // Intersecting an interval with the empty interval
+ assert_eq!(finite.intersection(&T::empty()).unwrap(), T::empty());
+
+ // Intersecting an interval with the full interval
+ assert_eq!(finite.intersection(&T::full()).unwrap(), finite);
+
+ // Intersecting finite intervals with a non-empty result
+ assert_eq!(
+ finite
+ .intersection(&T::new(finite.mid(), finite.hi() + 1.0))
+ .unwrap(),
+ T::new(finite.mid(), finite.hi())
+ );
+
+ // Intersecting finite intervals with an empty result
+ assert_eq!(
+ finite
+ .intersection(&T::new(finite.hi() + 1.0, finite.hi() + 2.0))
+ .unwrap(),
+ T::empty()
+ );
+
// Expanding by positive distance
assert_eq!(finite.expand_by(2.0), T::new(8.0, 22.0));
assert_eq!(finite.expand_by(5.0), T::new(5.0, 25.0));
@@ -979,6 +1095,158 @@ mod test {
);
}
+ #[test]
+ fn wraparound_interval_actually_wraparound_intersection() {
+ // Everything *except* the interval (10, 20)
+ let wraparound = WraparoundInterval::new(20.0, 10.0);
+
+ // Intersecting an empty interval
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::empty())
+ .unwrap(),
+ WraparoundInterval::empty()
+ );
+
+ // Intersecting an interval with itself
+ assert_eq!(wraparound.intersection(&wraparound).unwrap(), wraparound);
+
+ // Intersecting a wraparound interval with a "larger" wraparound
interval
+ // 10 20
+ // <==========| |============>
+ // <==============| |================>
+ // 14 16
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(16.0, 14.0))
+ .unwrap(),
+ wraparound
+ );
+
+ // Intersecting a wraparound interval with a "smaller" wraparound
interval
+ // 10 20
+ // <==========| |============>
+ // <=====| |=======>
+ // 5 25
+ // <=====| |=======>
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(25.0, 5.0))
+ .unwrap(),
+ WraparoundInterval::new(25.0, 5.0)
+ );
+
+ // Intersecting with partially intersecting wraparounds
+ // 10 20
+ // <==========| |============>
+ // <=====| |=================>
+ // 5 15
+ // <=====| |============>
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(15.0, 5.0))
+ .unwrap(),
+ WraparoundInterval::new(20.0, 5.0)
+ );
+
+ // 10 20
+ // <==========| |============>
+ // <================| |======>
+ // 15 25
+ // <==========| |======>
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(25.0, 15.0))
+ .unwrap(),
+ WraparoundInterval::new(25.0, 10.0)
+ );
+
+ // Intersecting wraparound with that would require >1 interval to
represent
+ // 10 20
+ // <==========| |=========================>
+ // <=============================| |======>
+ // 25 30
+ // <==========| |=======| |======>
+ wraparound
+ .intersection(&WraparoundInterval::new(30.0, 25.0))
+ .unwrap_err();
+
+ // 10 20
+ // <===================| |================>
+ // <==| |=================================>
+ // 0 5
+ // <==| |=====| |================>
+ wraparound
+ .intersection(&WraparoundInterval::new(5.0, 0.0))
+ .unwrap_err();
+
+ // Intersecting wraparound with a regular interval completely
contained by the original
+ // 10 20
+ // <=================| |==================>
+ // |=========|
+ // 25 30
+ // |=========|
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(25.0, 30.0))
+ .unwrap(),
+ WraparoundInterval::new(25.0, 30.0)
+ );
+
+ // 10 20
+ // <=================| |==================>
+ // |=========|
+ // 0 5
+ // |=========|
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(0.0, 5.0))
+ .unwrap(),
+ WraparoundInterval::new(0.0, 5.0)
+ );
+
+ // Intersecting wraparound with a partially intersecting regular
interval that
+ // intersects the left side
+ // 10 20
+ // <=================| |==================>
+ // |=========|
+ // 5 15
+ // |====|
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(5.0, 15.0))
+ .unwrap(),
+ WraparoundInterval::new(5.0, 10.0)
+ );
+
+ // Intersecting wraparound with a partially intersecting regular
interval that
+ // intersects the right side
+ // 10 20
+ // <=================| |==================>
+ // |=========|
+ // 15 25
+ // |====|
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(15.0, 25.0))
+ .unwrap(),
+ WraparoundInterval::new(20.0, 25.0)
+ );
+
+ // Intersecting wraparound with a disjoint regular interval
+ // 10 20
+ // <=================| |==================>
+ // |==|
+ // 12 15
+ //
+ assert_eq!(
+ wraparound
+ .intersection(&WraparoundInterval::new(12.0, 15.0))
+ .unwrap(),
+ WraparoundInterval::empty()
+ );
+ }
+
#[test]
fn wraparound_interval_actually_wraparound_expand_by() {
// Everything *except* the interval (10, 20)
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index ece17adf..4119280d 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -63,6 +63,7 @@ geo-types = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
sedona-common = { workspace = true }
+sedona-datasource = { workspace = true }
sedona-expr = { workspace = true }
sedona-functions = { workspace = true }
sedona-geo = { workspace = true, optional = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 3903cfa9..7723c20f 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -14,7 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use std::{collections::VecDeque, sync::Arc};
+use std::{
+ collections::{HashMap, VecDeque},
+ sync::Arc,
+};
use crate::exec::create_plan_from_sql;
use crate::object_storage::ensure_object_store_registered_with_options;
@@ -40,6 +43,8 @@ use datafusion_expr::sqlparser::dialect::{dialect_from_str,
Dialect};
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, SortExpr};
use parking_lot::Mutex;
use sedona_common::option::add_sedona_option_extension;
+use sedona_datasource::provider::external_listing_table;
+use sedona_datasource::spec::ExternalFormatSpec;
use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
use sedona_expr::{function_set::FunctionSet, scalar_udf::ScalarKernelRef};
use sedona_geoparquet::options::TableGeoParquetOptions;
@@ -258,6 +263,43 @@ impl SedonaContext {
self.ctx.read_table(Arc::new(provider))
}
+
+ /// Creates a [`DataFrame`] for reading a [ExternalFormatSpec]
+ pub async fn read_external_format<P: DataFilePaths>(
+ &self,
+ spec: Arc<dyn ExternalFormatSpec>,
+ table_paths: P,
+ options: Option<&HashMap<String, String>>,
+ check_extension: bool,
+ ) -> Result<DataFrame> {
+ let urls = table_paths.to_urls()?;
+
+ // Pre-register object store with our custom options before creating
GeoParquetReadOptions
+ if !urls.is_empty() {
+ // Extract the table options from GeoParquetReadOptions for object
store registration
+ ensure_object_store_registered_with_options(
+ &mut self.ctx.state(),
+ urls[0].as_str(),
+ options,
+ )
+ .await?;
+ }
+
+ let provider = if let Some(options) = options {
+ // Strip the filesystem-based options
+ let options_without_filesystems = options
+ .iter()
+ .filter(|(k, _)| !k.starts_with("gcs.") &&
!k.starts_with("aws."))
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect::<HashMap<String, String>>();
+ let spec = spec.with_options(&options_without_filesystems)?;
+ external_listing_table(spec, &self.ctx, urls,
check_extension).await?
+ } else {
+ external_listing_table(spec, &self.ctx, urls,
check_extension).await?
+ };
+
+ self.ctx.read_table(Arc::new(provider))
+ }
}
impl Default for SedonaContext {
@@ -468,11 +510,14 @@ impl ThreadSafeDialect {
#[cfg(test)]
mod tests {
- use arrow_schema::DataType;
+ use arrow_array::{create_array, ArrayRef, RecordBatchIterator,
RecordBatchReader};
+ use arrow_schema::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
+ use sedona_datasource::spec::{Object, OpenReaderArgs};
use sedona_schema::{
crs::lnglat,
datatypes::{Edges, SedonaType},
+ schema::SedonaSchema,
};
use sedona_testing::data::test_geoparquet;
use tempfile::tempdir;
@@ -579,20 +624,122 @@ mod tests {
// GeoParquet files
let ctx = SedonaContext::new_local_interactive().await.unwrap();
let example = test_geoparquet("example", "geometry").unwrap();
- let df = ctx.ctx.table(example).await.unwrap();
- let sedona_types: Result<Vec<_>> = df
+ let df = ctx.ctx.table(example.clone()).await.unwrap();
+ let sedona_types = df
.schema()
- .as_arrow()
- .fields()
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect();
- let sedona_types = sedona_types.unwrap();
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
assert_eq!(sedona_types.len(), 2);
assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
assert_eq!(
sedona_types[1],
SedonaType::WkbView(Edges::Planar, lnglat())
);
+
+ // Ensure read_parquet() works
+ let df = ctx
+ .read_parquet(example.clone(), GeoParquetReadOptions::default())
+ .await
+ .unwrap();
+ let sedona_types = df
+ .schema()
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
+ assert_eq!(sedona_types.len(), 2);
+ assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+ assert_eq!(
+ sedona_types[1],
+ SedonaType::WkbView(Edges::Planar, lnglat())
+ );
+ }
+
+ #[derive(Debug)]
+ struct ExampleSpec {}
+
+ #[async_trait]
+ impl ExternalFormatSpec for ExampleSpec {
+ async fn infer_schema(&self, _location: &Object) -> Result<Schema> {
+ Ok(Schema::new(vec![Field::new("x", DataType::Utf8, true)]))
+ }
+
+ async fn open_reader(
+ &self,
+ _args: &OpenReaderArgs,
+ ) -> Result<Box<dyn RecordBatchReader + Send>> {
+ let batch = RecordBatch::try_from_iter([(
+ "x",
+ create_array!(Utf8, ["one", "two", "three", "four"]) as
ArrayRef,
+ )])
+ .unwrap();
+ let schema = batch.schema();
+ Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
+ }
+
+ fn with_options(
+ &self,
+ options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn ExternalFormatSpec>> {
+ // Ensure we fail if we see any key/value options to ensure
aws/gcs options
+ // are stripped.
+ if !options.is_empty() {
+ return not_impl_err!("key/value options not implemented");
+ }
+
+ Ok(Arc::new(Self {}))
+ }
+ }
+
+ #[tokio::test]
+ async fn external_format() {
+ let ctx = SedonaContext::new_local_interactive().await.unwrap();
+ let spec = Arc::new(ExampleSpec {});
+ let file_that_exists = test_geoparquet("example", "geometry").unwrap();
+
+ // Ensure read_external_format() works
+ let df = ctx
+ .read_external_format(spec.clone(), file_that_exists.clone(),
None, false)
+ .await
+ .unwrap();
+ let batches = df.collect().await.unwrap();
+
+ assert_batches_eq!(
+ [
+ "+-------+",
+ "| x |",
+ "+-------+",
+ "| one |",
+ "| two |",
+ "| three |",
+ "| four |",
+ "+-------+",
+ ],
+ &batches
+ );
+
+ // Ensure that key/value options used by aws/gcs are stripped
+ let kv_options = HashMap::from([("key".to_string(),
"value".to_string())]);
+ ctx.read_external_format(
+ spec.clone(),
+ file_that_exists.clone(),
+ Some(&kv_options),
+ false,
+ )
+ .await
+ .expect_err("should error for unsupported key/value options");
+
+ let kv_options = HashMap::from([
+ ("gcs.something".to_string(), "value".to_string()),
+ ("aws.something".to_string(), "value".to_string()),
+ ]);
+ ctx.read_external_format(
+ spec.clone(),
+ file_that_exists.clone(),
+ Some(&kv_options),
+ false,
+ )
+ .await
+ .expect("should succeed because aws and gcs options were stripped");
}
}