This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new e0e1d109 feat(sedona-expr,sedona-schema): Implement item-level CRS
(#410)
e0e1d109 is described below
commit e0e1d109480727faaf7be25923b57b4686144438
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Dec 19 15:34:50 2025 -0600
feat(sedona-expr,sedona-schema): Implement item-level CRS (#410)
Co-authored-by: Copilot <[email protected]>
---
Cargo.lock | 3 +
python/sedonadb/tests/functions/test_transforms.py | 48 +-
rust/sedona-expr/Cargo.toml | 2 +
rust/sedona-expr/src/item_crs.rs | 599 +++++++++++++++++++++
rust/sedona-expr/src/lib.rs | 1 +
rust/sedona-functions/Cargo.toml | 1 +
rust/sedona-functions/src/st_setsrid.rs | 332 +++++++++++-
rust/sedona-functions/src/st_srid.rs | 287 +++++++++-
rust/sedona-schema/src/datatypes.rs | 30 ++
rust/sedona-schema/src/matchers.rs | 23 +
rust/sedona-testing/src/create.rs | 71 ++-
rust/sedona-testing/src/testers.rs | 18 +-
12 files changed, 1359 insertions(+), 56 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 2c61044f..a79e1c52 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4871,6 +4871,8 @@ dependencies = [
name = "sedona-expr"
version = "0.3.0"
dependencies = [
+ "arrow-array",
+ "arrow-buffer",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
@@ -4905,6 +4907,7 @@ name = "sedona-functions"
version = "0.3.0"
dependencies = [
"arrow-array",
+ "arrow-buffer",
"arrow-json",
"arrow-schema",
"criterion",
diff --git a/python/sedonadb/tests/functions/test_transforms.py
b/python/sedonadb/tests/functions/test_transforms.py
index ab899d1f..5ae0d20d 100644
--- a/python/sedonadb/tests/functions/test_transforms.py
+++ b/python/sedonadb/tests/functions/test_transforms.py
@@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+import geopandas
import pyproj
import pytest
from sedonadb.testing import PostGIS, SedonaDB, geom_or_null, val_or_null
@@ -82,12 +84,56 @@ def test_st_setcrs_sedonadb(eng, geom, crs, expected_srid):
assert df.crs.to_epsg() == expected_srid
+# We haven't wired up the engines to use/support item-level CRS yet. This
+# will be easier when EWKB and/or EWKT is supported (and/or this concept
+# is supported in geoarrow.pyarrow, which we use for testing)
+def test_item_crs_sedonadb():
+ eng = SedonaDB()
+ df = geopandas.GeoDataFrame(
+ {
+ "srid": [4326, 4326, 3857, 3857, 0, 0],
+ "geometry": geopandas.GeoSeries.from_wkt(
+ [
+ "POINT (0 1)",
+ "POINT (2 3)",
+ "POINT (4 5)",
+ "POINT (6 7)",
+ "POINT (8 9)",
+ None,
+ ]
+ ),
+ }
+ )
+
+ eng.con.create_data_frame(df).to_view("df")
+ eng.con.sql("SELECT ST_SetSRID(geometry, srid) as item_crs FROM
df").to_view(
+ "df_item_crs"
+ )
+
+ eng.assert_query_result(
+ "SELECT ST_SRID(item_crs) FROM df_item_crs",
+ [("4326",), ("4326",), ("3857",), ("3857",), ("0",), (None,)],
+ )
+
+ eng.assert_query_result(
+ "SELECT ST_Crs(item_crs) FROM df_item_crs",
+ [
+ ("OGC:CRS84",),
+ ("OGC:CRS84",),
+ ("EPSG:3857",),
+ ("EPSG:3857",),
+ ("0",),
+ (None,),
+ ],
+ )
+
+
@pytest.mark.parametrize("eng", [SedonaDB])
def test_st_crs_sedonadb(eng):
eng = eng.create_or_skip()
eng.assert_query_result(
"SELECT ST_CRS(ST_SetCrs(ST_GeomFromText('POINT (1 1)'),
'EPSG:26920'))",
- '"EPSG:26920"',
+ "EPSG:26920",
)
eng.assert_query_result(
"SELECT ST_CRS(ST_SetCrs(ST_GeomFromText('POINT (1 1)'), NULL))",
diff --git a/rust/sedona-expr/Cargo.toml b/rust/sedona-expr/Cargo.toml
index 8bfcbcf1..39c9cf56 100644
--- a/rust/sedona-expr/Cargo.toml
+++ b/rust/sedona-expr/Cargo.toml
@@ -35,6 +35,8 @@ rstest = { workspace = true }
sedona-testing = { workspace = true }
[dependencies]
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
diff --git a/rust/sedona-expr/src/item_crs.rs b/rust/sedona-expr/src/item_crs.rs
new file mode 100644
index 00000000..67869d9a
--- /dev/null
+++ b/rust/sedona-expr/src/item_crs.rs
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Debug, iter::zip, sync::Arc};
+
+use arrow_array::{ArrayRef, StructArray};
+use arrow_buffer::NullBuffer;
+use arrow_schema::{DataType, Field};
+use datafusion_common::{
+ cast::{as_string_view_array, as_struct_array},
+ DataFusionError, Result, ScalarValue,
+};
+use datafusion_expr::ColumnarValue;
+use sedona_common::sedona_internal_err;
+use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType,
matchers::ArgMatcher};
+
+use crate::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
+
+/// Wrap a [SedonaScalarKernel] to provide Item CRS type support
+///
+/// Most kernels that operate on geometry or geography in some way
+/// can also support Item CRS inputs:
+///
+/// - Functions that return a non-spatial type whose value does not
+/// depend on the input CRS only need to operate on the `item` portion
+/// of any item_crs input (e.g., ST_Area()).
+/// - Functions that accept two or more spatial arguments must have
+/// compatible CRSes.
+/// - Functions that return a geometry or geography must also return
+/// an item_crs type where the output CRSes are propagated from the
+/// input.
+///
+/// This kernel provides an automatic wrapper enforcing these rules.
+/// It is appropriate for most functions except:
+///
+/// - Functions whose return value depends on the CRS
+/// - Functions whose return value depends on the value of a scalar
+/// argument
+/// - Functions whose return CRS is not strictly propagated from the
+/// CRSes of the arguments.
+#[derive(Debug)]
+pub struct ItemCrsKernel {
+ inner: ScalarKernelRef,
+}
+
+impl ItemCrsKernel {
+ pub fn new_ref(inner: ScalarKernelRef) -> ScalarKernelRef {
+ Arc::new(Self { inner })
+ }
+}
+
+impl SedonaScalarKernel for ItemCrsKernel {
+ fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+ return_type_handle_item_crs(self.inner.as_ref(), args)
+ }
+
+ fn invoke_batch_from_args(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ return_type: &SedonaType,
+ num_rows: usize,
+ ) -> Result<ColumnarValue> {
+ invoke_handle_item_crs(self.inner.as_ref(), arg_types, args,
return_type, num_rows)
+ }
+
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ sedona_internal_err!("Should not be called because
invoke_batch_from_args() is implemented")
+ }
+}
+
+/// Calculate a return type based on the underlying kernel
+///
+/// This function extracts the item portion of any item_crs input and
+/// passes the result to the underlying kernel's return type implementation.
+/// If the underlying kernel is going to return a geometry or geography type,
+/// we wrap it in an item_crs type.
+///
+/// This function does not pass on input scalars, because those types of
+/// functions as used in SedonaDB typically assign a type-level CRS.
+/// Functions that use scalar inputs to calculate an output type need
+/// to implement an item_crs implementation themselves.
+fn return_type_handle_item_crs(
+ kernel: &dyn SedonaScalarKernel,
+ arg_types: &[SedonaType],
+) -> Result<Option<SedonaType>> {
+ let item_crs_matcher = ArgMatcher::is_item_crs();
+
+ // If there are no item_crs arguments, this kernel never applies.
+ if !arg_types
+ .iter()
+ .any(|arg_type| item_crs_matcher.match_type(arg_type))
+ {
+ return Ok(None);
+ }
+
+ // Extract the item types. This also strips the type-level CRS for any non
item-crs
+ // type, because any resulting geometry type should be CRS free.
+ let item_arg_types = arg_types
+ .iter()
+ .map(|arg_type|
parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
+ .collect::<Result<Vec<_>>>()?;
+
+ // Any kernel that uses scalars to determine the output type is spurious
here, so we
+ // pretend that there aren't any for the purposes of computing the type.
+ let scalar_args_none = (0..arg_types.len())
+ .map(|_| None)
+ .collect::<Vec<Option<&ScalarValue>>>();
+
+ // If the wrapped kernel matches and returns a geometry type, that
geometry type will be an
+ // item/crs type. The new_item_crs() function handles stripping any CRS
that might be present
+ // in the output type.
+ if let Some(item_type) =
+ kernel.return_type_from_args_and_scalars(&item_arg_types,
&scalar_args_none)?
+ {
+ let geo_matcher = ArgMatcher::is_geometry_or_geography();
+ if geo_matcher.match_type(&item_type) {
+ Ok(Some(SedonaType::new_item_crs(&item_type)?))
+ } else {
+ Ok(Some(item_type))
+ }
+ } else {
+ Ok(None)
+ }
+}
+
+/// Execute an underlying kernel
+///
+/// This function handles invoking the underlying kernel, which operates
+/// only on the `item` portion of the `item_crs` type. Before executing,
+/// this function handles ensuring that all CRSes are compatible, and,
+/// if necessary, wrap a geometry or geography output in an item_crs
+/// type.
+fn invoke_handle_item_crs(
+ kernel: &dyn SedonaScalarKernel,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ return_type: &SedonaType,
+ num_rows: usize,
+) -> Result<ColumnarValue> {
+ // Separate the argument types into item and Option<crs>
+ // Don't strip the CRSes because we need them to compare with
+ // the item-level CRSes to ensure they are equal.
+ let arg_types_unwrapped = arg_types
+ .iter()
+ .map(parse_item_crs_arg_type)
+ .collect::<Result<Vec<_>>>()?;
+
+ let args_unwrapped = zip(&arg_types_unwrapped, args)
+ .map(|(arg_type, arg)| {
+ let (item_type, crs_type) = arg_type;
+ parse_item_crs_arg(item_type, crs_type, arg)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let crs_args = args_unwrapped
+ .iter()
+ .flat_map(|(_, crs_arg)| crs_arg)
+ .collect::<Vec<_>>();
+
+ let crs_result = ensure_crs_args_equal(&crs_args)?;
+
+ let item_types = arg_types_unwrapped
+ .iter()
+ .map(|(item_type, _)| item_type.clone())
+ .collect::<Vec<_>>();
+ let item_args = args_unwrapped
+ .iter()
+ .map(|(item_arg, _)| item_arg.clone())
+ .collect::<Vec<_>>();
+
+ let item_arg_types_no_crs = arg_types
+ .iter()
+ .map(|arg_type|
parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
+ .collect::<Result<Vec<_>>>()?;
+ let out_item_type = match kernel.return_type(&item_arg_types_no_crs)? {
+ Some(matched_item_type) => matched_item_type,
+ None => return sedona_internal_err!("Expected inner kernel to match
types {item_types:?}"),
+ };
+
+ let item_result =
+ kernel.invoke_batch_from_args(&item_types, &item_args, return_type,
num_rows)?;
+
+ if ArgMatcher::is_geometry_or_geography().match_type(&out_item_type) {
+ make_item_crs(&out_item_type, item_result, crs_result, None)
+ } else {
+ Ok(item_result)
+ }
+}
+
+/// Create a new item_crs struct [ColumnarValue]
+///
+/// Optionally provide extra nulls (e.g., to satisfy a scalar function contract
+/// where null inputs -> null outputs).
+pub fn make_item_crs(
+ item_type: &SedonaType,
+ item_result: ColumnarValue,
+ crs_result: &ColumnarValue,
+ extra_nulls: Option<&NullBuffer>,
+) -> Result<ColumnarValue> {
+ let out_fields = vec![
+ item_type.to_storage_field("item", true)?,
+ Field::new("crs", DataType::Utf8View, true),
+ ];
+
+ let scalar_result = matches!(
+ (&item_result, crs_result),
+ (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
+ );
+
+ let item_crs_arrays = ColumnarValue::values_to_arrays(&[item_result,
crs_result.clone()])?;
+ let item_array = &item_crs_arrays[0];
+ let crs_array = &item_crs_arrays[1];
+ let nulls = NullBuffer::union(item_array.nulls(), extra_nulls);
+
+ let item_crs_array = StructArray::new(
+ out_fields.into(),
+ vec![item_array.clone(), crs_array.clone()],
+ nulls,
+ );
+
+ if scalar_result {
+ Ok(ScalarValue::Struct(Arc::new(item_crs_array)).into())
+ } else {
+ Ok(ColumnarValue::Array(Arc::new(item_crs_array)))
+ }
+}
+
+/// Given an input type, separate it into an item and crs type (if the input
+/// is an item_crs type). Otherwise, just return the item type as is.
+fn parse_item_crs_arg_type(sedona_type: &SedonaType) -> Result<(SedonaType,
Option<SedonaType>)> {
+ if let SedonaType::Arrow(DataType::Struct(fields)) = sedona_type {
+ let field_names = fields.iter().map(|f| f.name()).collect::<Vec<_>>();
+ if field_names != ["item", "crs"] {
+ return Ok((sedona_type.clone(), None));
+ }
+
+ let item = SedonaType::from_storage_field(&fields[0])?;
+ let crs = SedonaType::from_storage_field(&fields[1])?;
+ Ok((item, Some(crs)))
+ } else {
+ Ok((sedona_type.clone(), None))
+ }
+}
+
+/// Given an input type, separate it into an item and crs type (if the input
+/// is an item_crs type). Otherwise, just return the item type as is. This
+/// version strips the CRS, which we need to do here before passing it to the
+/// underlying kernel (which expects all input CRSes to match).
+fn parse_item_crs_arg_type_strip_crs(
+ sedona_type: &SedonaType,
+) -> Result<(SedonaType, Option<SedonaType>)> {
+ match sedona_type {
+ SedonaType::Wkb(edges, _) => Ok((SedonaType::Wkb(*edges, None), None)),
+ SedonaType::WkbView(edges, _) => Ok((SedonaType::WkbView(*edges,
None), None)),
+ SedonaType::Arrow(DataType::Struct(fields))
+ if fields.iter().map(|f| f.name()).collect::<Vec<_>>() ==
vec!["item", "crs"] =>
+ {
+ let item = SedonaType::from_storage_field(&fields[0])?;
+ let crs = SedonaType::from_storage_field(&fields[1])?;
+ Ok((item, Some(crs)))
+ }
+ other => Ok((other.clone(), None)),
+ }
+}
+
+/// Separate an argument into the item and its crs (if applicable). This
+/// operates on the result of parse_item_crs_arg_type().
+fn parse_item_crs_arg(
+ item_type: &SedonaType,
+ crs_type: &Option<SedonaType>,
+ arg: &ColumnarValue,
+) -> Result<(ColumnarValue, Option<ColumnarValue>)> {
+ if crs_type.is_some() {
+ return match arg {
+ ColumnarValue::Array(array) => {
+ let struct_array = as_struct_array(array)?;
+ Ok((
+ ColumnarValue::Array(struct_array.column(0).clone()),
+ Some(ColumnarValue::Array(struct_array.column(1).clone())),
+ ))
+ }
+ ColumnarValue::Scalar(scalar_value) => {
+ if let ScalarValue::Struct(struct_array) = scalar_value {
+ let item_scalar =
ScalarValue::try_from_array(struct_array.column(0), 0)?;
+ let crs_scalar =
ScalarValue::try_from_array(struct_array.column(1), 0)?;
+ Ok((
+ ColumnarValue::Scalar(item_scalar),
+ Some(ColumnarValue::Scalar(crs_scalar)),
+ ))
+ } else {
+ sedona_internal_err!(
+ "Expected struct scalar for item_crs but got {}",
+ scalar_value
+ )
+ }
+ }
+ };
+ }
+
+ match item_type {
+ SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => {
+ let crs_scalar = if let Some(crs) = crs {
+ if let Some(auth_code) = crs.to_authority_code()? {
+ ScalarValue::Utf8View(Some(auth_code))
+ } else {
+ ScalarValue::Utf8View(Some(crs.to_json()))
+ }
+ } else {
+ ScalarValue::Utf8View(None)
+ };
+
+ Ok((arg.clone(), Some(ColumnarValue::Scalar(crs_scalar))))
+ }
+ _ => Ok((arg.clone(), None)),
+ }
+}
+
+/// Ensures values representing CRSes all represent equivalent CRS values
+fn ensure_crs_args_equal<'a>(crs_args: &[&'a ColumnarValue]) -> Result<&'a
ColumnarValue> {
+ match crs_args.len() {
+ 0 => sedona_internal_err!("Zero CRS arguments as input to item_crs"),
+ 1 => Ok(crs_args[0]),
+ _ => {
+ let crs_args_string = crs_args
+ .iter()
+ .map(|arg| arg.cast_to(&DataType::Utf8View, None))
+ .collect::<Result<Vec<_>>>()?;
+ let crs_arrays =
ColumnarValue::values_to_arrays(&crs_args_string)?;
+ for i in 1..crs_arrays.len() {
+ ensure_crs_string_arrays_equal2(&crs_arrays[i - 1],
&crs_arrays[i])?
+ }
+
+ Ok(crs_args[0])
+ }
+ }
+}
+
+// Checks two string view arrays for equality when each represents a string
representation
+// of a CRS
+fn ensure_crs_string_arrays_equal2(lhs: &ArrayRef, rhs: &ArrayRef) ->
Result<()> {
+ for (lhs_item, rhs_item) in zip(as_string_view_array(lhs)?,
as_string_view_array(rhs)?) {
+ if lhs_item == rhs_item {
+ // First check for byte-for-byte equality (faster and most likely)
+ continue;
+ }
+
+ // Check the deserialized CRS values for equality
+ if let (Some(lhs_item_str), Some(rhs_item_str)) = (lhs_item, rhs_item)
{
+ let lhs_crs = deserialize_crs(lhs_item_str)?;
+ let rhs_crs = deserialize_crs(rhs_item_str)?;
+ if lhs_crs == rhs_crs {
+ continue;
+ }
+ }
+
+ if lhs_item != rhs_item {
+ return Err(DataFusionError::Execution(format!(
+ "CRS values not equal: {lhs_item:?} vs {rhs_item:?}",
+ )));
+ }
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod test {
+ use datafusion_expr::ScalarUDF;
+ use rstest::rstest;
+ use sedona_schema::{
+ crs::lnglat,
+ datatypes::{Edges, SedonaType, WKB_GEOMETRY},
+ };
+ use sedona_testing::{
+ create::create_array_item_crs, create::create_scalar_item_crs,
testers::ScalarUdfTester,
+ };
+
+ use crate::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
+
+ use super::*;
+
+ // A test function of something + geometry -> out_type
+ fn test_udf(out_type: SedonaType) -> ScalarUDF {
+ let geom_to_geom_kernel = SimpleSedonaScalarKernel::new_ref(
+ ArgMatcher::new(
+ vec![ArgMatcher::is_any(), ArgMatcher::is_geometry()],
+ out_type,
+ ),
+ Arc::new(|_arg_types, args| Ok(args[0].clone())),
+ );
+
+ let crsified_kernel = ItemCrsKernel::new_ref(geom_to_geom_kernel);
+ SedonaScalarUDF::from_kernel("fun", crsified_kernel.clone()).into()
+ }
+
+ fn basic_item_crs_type() -> SedonaType {
+ SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()
+ }
+
+ #[test]
+ fn item_crs_kernel_no_match() {
+ // A call with geometry + geometry should fail (this case would be
handled by the
+ // original kernel, not the item_crs kernel)
+ let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY),
vec![WKB_GEOMETRY, WKB_GEOMETRY]);
+ let err = tester.return_type().unwrap_err();
+ assert_eq!(
+ err.message(),
+ "fun([Wkb(Planar, None), Wkb(Planar, None)]): No kernel matching
arguments"
+ );
+ }
+
+ #[rstest]
+ fn item_crs_kernel_basic(
+ #[values(
+ (WKB_GEOMETRY, basic_item_crs_type()),
+ (basic_item_crs_type(), WKB_GEOMETRY),
+ (basic_item_crs_type(), basic_item_crs_type())
+ )]
+ arg_types: (SedonaType, SedonaType),
+ ) {
+ // A call with geometry + item_crs or both item_crs should return
item_crs
+ let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY),
vec![arg_types.0, arg_types.1]);
+ tester.assert_return_type(basic_item_crs_type());
+ let result = tester
+ .invoke_scalar_scalar("POINT (0 1)", "POINT (1 2)")
+ .unwrap();
+ assert_eq!(
+ result,
+ create_scalar_item_crs(Some("POINT (0 1)"), None, &WKB_GEOMETRY)
+ );
+ }
+
+ #[test]
+ fn item_crs_kernel_crs_values() {
+ let tester = ScalarUdfTester::new(
+ test_udf(WKB_GEOMETRY),
+ vec![basic_item_crs_type(), basic_item_crs_type()],
+ );
+ tester.assert_return_type(basic_item_crs_type());
+
+ let scalar_item_crs_4326 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"),
&WKB_GEOMETRY);
+ let scalar_item_crs_crs84 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"),
&WKB_GEOMETRY);
+ let scalar_item_crs_3857 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"),
&WKB_GEOMETRY);
+
+ // Should be able to execute when both arguments have an equal
+ // (but not necessarily identical) CRS
+ let result = tester
+ .invoke_scalar_scalar(scalar_item_crs_4326.clone(),
scalar_item_crs_crs84.clone())
+ .unwrap();
+ assert_eq!(result, scalar_item_crs_4326);
+
+ // We should get an error when the CRSes are not compatible
+ let err = tester
+ .invoke_scalar_scalar(scalar_item_crs_4326.clone(),
scalar_item_crs_3857.clone())
+ .unwrap_err();
+ assert_eq!(
+ err.message(),
+ "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+ );
+ }
+
+ #[test]
+ fn item_crs_kernel_crs_types() {
+ let scalar_item_crs_4326 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"),
&WKB_GEOMETRY);
+ let scalar_item_crs_crs84 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"),
&WKB_GEOMETRY);
+ let scalar_item_crs_3857 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"),
&WKB_GEOMETRY);
+
+ let sedona_type_lnglat = SedonaType::Wkb(Edges::Planar, lnglat());
+ let tester = ScalarUdfTester::new(
+ test_udf(WKB_GEOMETRY),
+ vec![basic_item_crs_type(), sedona_type_lnglat.clone()],
+ );
+ tester.assert_return_type(basic_item_crs_type());
+
+ // We should be able to execute item_crs + geometry when the crs
compares equal
+ let result = tester
+ .invoke_scalar_scalar(scalar_item_crs_4326.clone(), "POINT (3 4)")
+ .unwrap();
+ assert_eq!(result, scalar_item_crs_4326);
+
+ let result = tester
+ .invoke_scalar_scalar(scalar_item_crs_crs84.clone(), "POINT (3 4)")
+ .unwrap();
+ assert_eq!(result, scalar_item_crs_crs84);
+
+ // We should get an error when the CRSes are not compatible
+ let err = tester
+ .invoke_scalar_scalar(scalar_item_crs_3857.clone(), "POINT (3 4)")
+ .unwrap_err();
+ assert_eq!(
+ err.message(),
+ "CRS values not equal: Some(\"EPSG:3857\") vs Some(\"OGC:CRS84\")"
+ );
+ }
+
+ #[test]
+ fn item_crs_kernel_arrays() {
+ let tester = ScalarUdfTester::new(
+ test_udf(WKB_GEOMETRY),
+ vec![basic_item_crs_type(), basic_item_crs_type()],
+ );
+
+ let array_item_crs_lnglat = create_array_item_crs(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (3 4)"),
+ ],
+ [Some("EPSG:4326"), Some("EPSG:4326"), Some("EPSG:4326")],
+ &WKB_GEOMETRY,
+ );
+ let scalar_item_crs_4326 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"),
&WKB_GEOMETRY);
+ let scalar_item_crs_3857 =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"),
&WKB_GEOMETRY);
+
+ // This should succeed when all CRS combinations are compatible
+ let result = tester
+ .invoke_array_scalar(array_item_crs_lnglat.clone(),
scalar_item_crs_4326.clone())
+ .unwrap();
+ assert_eq!(&result, &array_item_crs_lnglat);
+
+ // This should fail otherwise
+ let err = tester
+ .invoke_array_scalar(array_item_crs_lnglat.clone(),
scalar_item_crs_3857.clone())
+ .unwrap_err();
+ assert_eq!(
+ err.message(),
+ "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+ );
+ }
+
+ #[test]
+ fn item_crs_kernel_non_spatial_args_and_result() {
+ let scalar_item_crs =
+ create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"),
&WKB_GEOMETRY);
+
+ let tester = ScalarUdfTester::new(
+ test_udf(SedonaType::Arrow(DataType::Int32)),
+ vec![SedonaType::Arrow(DataType::Int32), basic_item_crs_type()],
+ );
+ tester.assert_return_type(DataType::Int32);
+
+ let result = tester.invoke_scalar_scalar(1234,
scalar_item_crs).unwrap();
+ assert_eq!(result, ScalarValue::Int32(Some(1234)))
+ }
+
+ #[test]
+ fn crs_args_equal() {
+ // Zero args
+ let err = ensure_crs_args_equal(&[]).unwrap_err();
+ assert!(err.message().contains("Zero CRS arguments"));
+
+ let crs_lnglat =
ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:4326".to_string())));
+ let crs_also_lnglat =
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("OGC:CRS84".to_string())));
+ let crs_other =
ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:3857".to_string())));
+
+ // One arg
+ let result_one_arg = ensure_crs_args_equal(&[&crs_lnglat]).unwrap();
+ assert!(std::ptr::eq(result_one_arg, &crs_lnglat));
+
+ // Two args (equal)
+ let result_two_args = ensure_crs_args_equal(&[&crs_lnglat,
&crs_also_lnglat]).unwrap();
+ assert!(std::ptr::eq(result_two_args, &crs_lnglat));
+
+ // Two args (not equal)
+ let err = ensure_crs_args_equal(&[&crs_lnglat,
&crs_other]).unwrap_err();
+ assert_eq!(
+ err.message(),
+ "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+ );
+ }
+}
diff --git a/rust/sedona-expr/src/lib.rs b/rust/sedona-expr/src/lib.rs
index c200b8e5..e6f9cf78 100644
--- a/rust/sedona-expr/src/lib.rs
+++ b/rust/sedona-expr/src/lib.rs
@@ -16,6 +16,7 @@
// under the License.
pub mod aggregate_udf;
pub mod function_set;
+pub mod item_crs;
pub mod scalar_udf;
pub mod spatial_filter;
pub mod statistics;
diff --git a/rust/sedona-functions/Cargo.toml b/rust/sedona-functions/Cargo.toml
index 57afd240..9fb7f35f 100644
--- a/rust/sedona-functions/Cargo.toml
+++ b/rust/sedona-functions/Cargo.toml
@@ -41,6 +41,7 @@ tokio = { workspace = true, features = ["rt", "macros"] }
[dependencies]
arrow-schema = { workspace = true }
arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
geo-traits = { workspace = true }
diff --git a/rust/sedona-functions/src/st_setsrid.rs
b/rust/sedona-functions/src/st_setsrid.rs
index 4cd0759e..5e0d4a8b 100644
--- a/rust/sedona-functions/src/st_setsrid.rs
+++ b/rust/sedona-functions/src/st_setsrid.rs
@@ -14,16 +14,30 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use std::{sync::Arc, vec};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::{Arc, OnceLock},
+};
-use arrow_array::builder::BinaryBuilder;
+use arrow_array::{
+ builder::{BinaryBuilder, NullBufferBuilder},
+ ArrayRef, StringViewArray,
+};
+use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
-use datafusion_common::{error::Result, DataFusionError, ScalarValue};
+use datafusion_common::{
+ cast::{as_int64_array, as_string_view_array},
+ error::Result,
+ DataFusionError, ScalarValue,
+};
use datafusion_expr::{
scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation,
Volatility,
};
use sedona_common::sedona_internal_err;
-use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel,
SedonaScalarUDF};
+use sedona_expr::{
+ item_crs::make_item_crs,
+ scalar_udf::{ScalarKernelRef, SedonaScalarKernel, SedonaScalarUDF},
+};
use sedona_geometry::transform::CrsEngine;
use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType,
matchers::ArgMatcher};
@@ -124,12 +138,25 @@ impl SedonaScalarKernel for STSetSRID {
determine_return_type(args, scalar_args, self.engine.as_ref())
}
- fn invoke_batch(
+ fn invoke_batch_from_args(
&self,
- _arg_types: &[SedonaType],
+ arg_types: &[SedonaType],
args: &[ColumnarValue],
+ return_type: &SedonaType,
+ _num_rows: usize,
) -> Result<ColumnarValue> {
- Ok(args[0].clone())
+ let item_crs_matcher = ArgMatcher::is_item_crs();
+ if item_crs_matcher.match_type(return_type) {
+ let normalized_crs_value = normalize_crs_array(&args[1],
self.engine.as_ref())?;
+ make_item_crs(
+ &arg_types[0],
+ args[0].clone(),
+ &ColumnarValue::Array(normalized_crs_value),
+ crs_input_nulls(&args[1]),
+ )
+ } else {
+ Ok(args[0].clone())
+ }
}
fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
@@ -137,6 +164,14 @@ impl SedonaScalarKernel for STSetSRID {
"Should not be called because return_type_from_args_and_scalars()
is implemented"
)
}
+
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ sedona_internal_err!("Should not be called because
invoke_batch_from_args() is implemented")
+ }
}
#[derive(Debug)]
@@ -159,12 +194,25 @@ impl SedonaScalarKernel for STSetCRS {
determine_return_type(args, scalar_args, self.engine.as_ref())
}
- fn invoke_batch(
+ fn invoke_batch_from_args(
&self,
- _arg_types: &[SedonaType],
+ arg_types: &[SedonaType],
args: &[ColumnarValue],
+ return_type: &SedonaType,
+ _num_rows: usize,
) -> Result<ColumnarValue> {
- Ok(args[0].clone())
+ let item_crs_matcher = ArgMatcher::is_item_crs();
+ if item_crs_matcher.match_type(return_type) {
+ let normalized_crs_value = normalize_crs_array(&args[1],
self.engine.as_ref())?;
+ make_item_crs(
+ &arg_types[0],
+ args[0].clone(),
+ &ColumnarValue::Array(normalized_crs_value),
+ crs_input_nulls(&args[1]),
+ )
+ } else {
+ Ok(args[0].clone())
+ }
}
fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
@@ -172,24 +220,14 @@ impl SedonaScalarKernel for STSetCRS {
"Should not be called because return_type_from_args_and_scalars()
is implemented"
)
}
-}
-/// Validate a CRS string
-///
-/// If an engine is provided, the engine will be used to validate the CRS. If
absent,
-/// the CRS will only be validated using the basic checks in [deserialize_crs].
-pub fn validate_crs(
- crs: &str,
- maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
-) -> Result<()> {
- if let Some(engine) = maybe_engine {
- engine
- .as_ref()
- .get_transform_crs_to_crs(crs, crs, None, "")
- .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ sedona_internal_err!("Should not be called because
invoke_batch_from_args() is implemented")
}
-
- Ok(())
}
fn determine_return_type(
@@ -223,6 +261,8 @@ fn determine_return_type(
_ => {}
}
}
+ } else {
+ return Ok(Some(SedonaType::new_item_crs(&args[0])?));
}
sedona_internal_err!("Unexpected argument types: {}, {}", args[0], args[1])
@@ -341,10 +381,144 @@ impl SedonaScalarKernel for SRIDifiedKernel {
}
}
+static SINGLE_NULL_BUFFER: OnceLock<NullBuffer> = OnceLock::new();
+
+fn crs_input_nulls(crs_value: &ColumnarValue) -> Option<&NullBuffer> {
+ match crs_value {
+ ColumnarValue::Array(array) => array.nulls(),
+ ColumnarValue::Scalar(scalar_value) => {
+ if scalar_value.is_null() {
+ let null_buffer = SINGLE_NULL_BUFFER.get_or_init(|| {
+ let mut builder = NullBufferBuilder::new(1);
+ builder.append(false);
+ builder
+ .finish()
+ .expect("Failed to build single null buffer")
+ });
+ Some(null_buffer)
+ } else {
+ None
+ }
+ }
+ }
+}
+
+/// Given an SRID or CRS array, compute the final crs array to put in the
item_crs struct
+///
+/// For SRID arrays, this is `EPSG:<srid>` except for the SRID of 0 (which maps
+/// to a null value in the CRS array) and 4326 (which maps to a value of
OGC:CRS84
+/// in the CRS array).
+///
+/// For CRS arrays of strings, this function attempts to abbreviate any
inputs. For example,
+/// PROJJSON input will attempt to be abbreviated to authority:code if
possible (or left
+/// as is otherwise). The special value "0" maps to a null value in the CRS
array.
+fn normalize_crs_array(
+ crs_value: &ColumnarValue,
+ maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
+) -> Result<ArrayRef> {
+ match crs_value.data_type() {
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64 => {
+ // Local cache to avoid re-validating inputs
+ let mut known_valid = HashSet::new();
+
+ let int_value = crs_value.cast_to(&DataType::Int64, None)?;
+ let int_array_ref = ColumnarValue::values_to_arrays(&[int_value])?;
+ let int_array = as_int64_array(&int_array_ref[0])?;
+ let utf8_view_array = int_array
+ .iter()
+ .map(|maybe_srid| -> Result<Option<String>> {
+ if let Some(srid) = maybe_srid {
+ if srid == 0 {
+ return Ok(None);
+ } else if srid == 4326 {
+ return Ok(Some("OGC:CRS84".to_string()));
+ }
+
+ let auth_code = format!("EPSG:{srid}");
+ if !known_valid.contains(&srid) {
+ validate_crs(&auth_code, maybe_engine)?;
+ known_valid.insert(srid);
+ }
+
+ Ok(Some(auth_code))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<StringViewArray>>()?;
+
+ Ok(Arc::new(utf8_view_array))
+ }
+ _ => {
+ let mut known_abbreviated = HashMap::<String, String>::new();
+
+ let string_value = crs_value.cast_to(&DataType::Utf8View, None)?;
+ let string_array_ref =
ColumnarValue::values_to_arrays(&[string_value])?;
+ let string_view_array =
as_string_view_array(&string_array_ref[0])?;
+ let utf8_view_array = string_view_array
+ .iter()
+ .map(|maybe_crs| -> Result<Option<String>> {
+ if let Some(crs_str) = maybe_crs {
+ if crs_str == "0" {
+ return Ok(None);
+ }
+
+ if let Some(abbreviated_crs) =
known_abbreviated.get(crs_str) {
+ Ok(Some(abbreviated_crs.clone()))
+ } else if let Some(crs) = deserialize_crs(crs_str)? {
+ let abbreviated_crs =
+ if let Some(auth_code) =
crs.to_authority_code()? {
+ auth_code
+ } else {
+ crs_str.to_string()
+ };
+
+ known_abbreviated.insert(crs.to_string(),
abbreviated_crs.clone());
+ Ok(Some(abbreviated_crs))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<StringViewArray>>()?;
+
+ Ok(Arc::new(utf8_view_array))
+ }
+ }
+}
+
+/// Validate a CRS string
+///
+/// If an engine is provided, the engine will be used to validate the CRS. If
absent,
+/// the CRS will only be validated using the basic checks in [deserialize_crs].
+pub fn validate_crs(
+ crs: &str,
+ maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
+) -> Result<()> {
+ if let Some(engine) = maybe_engine {
+ engine
+ .as_ref()
+ .get_transform_crs_to_crs(crs, crs, None, "")
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ }
+
+ Ok(())
+}
+
#[cfg(test)]
mod test {
use std::rc::Rc;
+ use arrow_array::{create_array, ArrayRef};
use arrow_schema::Field;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF};
@@ -353,7 +527,11 @@ mod test {
crs::lnglat,
datatypes::{Edges, WKB_GEOMETRY},
};
- use sedona_testing::{compare::assert_value_equal,
create::create_scalar_value};
+ use sedona_testing::{
+ compare::assert_value_equal,
+ create::{create_array, create_array_item_crs, create_scalar_value},
+ testers::ScalarUdfTester,
+ };
use super::*;
@@ -461,6 +639,106 @@ mod test {
assert_eq!(err.message(), "Unknown geometry error")
}
+ #[test]
+ fn udf_item_srid() {
+ let tester = ScalarUdfTester::new(
+ st_set_srid_udf().into(),
+ vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Int32)],
+ );
+
tester.assert_return_type(SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap());
+
+ let geometry_array = create_array(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ Some("POINT (8 9)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let crs_array =
+ create_array!(Int32, [Some(4326), Some(3857), Some(3857), Some(0),
None]) as ArrayRef;
+
+ let result = tester
+ .invoke_array_array(geometry_array, crs_array)
+ .unwrap();
+ assert_eq!(
+ &result,
+ &create_array_item_crs(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ None,
+ ],
+ [
+ Some("OGC:CRS84"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ None,
+ None
+ ],
+ &WKB_GEOMETRY
+ )
+ );
+ }
+
+ #[test]
+ fn udf_item_crs() {
+ let tester = ScalarUdfTester::new(
+ st_set_crs_udf().into(),
+ vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Utf8)],
+ );
+
tester.assert_return_type(SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap());
+
+ let geometry_array = create_array(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ Some("POINT (8 9)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let crs_array = create_array!(
+ Utf8,
+ [
+ Some("EPSG:4326"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ Some("0"),
+ None
+ ]
+ ) as ArrayRef;
+
+ let result = tester
+ .invoke_array_array(geometry_array, crs_array)
+ .unwrap();
+ assert_eq!(
+ &result,
+ &create_array_item_crs(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ None
+ ],
+ [
+ Some("OGC:CRS84"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ None,
+ None
+ ],
+ &WKB_GEOMETRY
+ )
+ );
+ }
+
fn call_udf(
udf: &ScalarUDF,
arg: ColumnarValue,
diff --git a/rust/sedona-functions/src/st_srid.rs
b/rust/sedona-functions/src/st_srid.rs
index 4175f3a2..80a22463 100644
--- a/rust/sedona-functions/src/st_srid.rs
+++ b/rust/sedona-functions/src/st_srid.rs
@@ -14,19 +14,26 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
use crate::executor::WkbExecutor;
-use arrow_array::builder::StringBuilder;
-use arrow_array::builder::UInt32Builder;
-use arrow_array::Array;
+use arrow_array::{
+ builder::{StringViewBuilder, UInt32Builder},
+ Array,
+};
use arrow_schema::DataType;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{
+ cast::{as_string_view_array, as_struct_array},
+ exec_err, DataFusionError, Result, ScalarValue,
+};
use datafusion_expr::{
scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation,
Volatility,
};
+use sedona_common::sedona_internal_err;
use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_schema::crs::deserialize_crs;
use sedona_schema::datatypes::SedonaType;
use sedona_schema::matchers::ArgMatcher;
-use std::{sync::Arc, vec};
+use std::{collections::HashMap, iter::zip, sync::Arc};
/// ST_Srid() scalar UDF implementation
///
@@ -34,7 +41,7 @@ use std::{sync::Arc, vec};
pub fn st_srid_udf() -> SedonaScalarUDF {
SedonaScalarUDF::new(
"st_srid",
- vec![Arc::new(StSrid {})],
+ vec![Arc::new(StSridItemCrs {}), Arc::new(StSrid {})],
Volatility::Immutable,
Some(st_srid_doc()),
)
@@ -46,7 +53,7 @@ pub fn st_srid_udf() -> SedonaScalarUDF {
pub fn st_crs_udf() -> SedonaScalarUDF {
SedonaScalarUDF::new(
"st_crs",
- vec![Arc::new(StCrs {})],
+ vec![Arc::new(StCrsItemCrs {}), Arc::new(StCrs {})],
Volatility::Immutable,
Some(st_crs_doc()),
)
@@ -119,6 +126,84 @@ impl SedonaScalarKernel for StSrid {
}
}
+#[derive(Debug)]
+struct StSridItemCrs {}
+
+impl SedonaScalarKernel for StSridItemCrs {
+ fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+ let matcher = ArgMatcher::new(
+ vec![ArgMatcher::is_item_crs()],
+ SedonaType::Arrow(DataType::UInt32),
+ );
+
+ matcher.match_args(args)
+ }
+
+ fn invoke_batch(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ let executor = WkbExecutor::new(arg_types, args);
+ let mut builder =
UInt32Builder::with_capacity(executor.num_iterations());
+
+ let item_crs_struct_array = match &args[0] {
+ ColumnarValue::Array(array) => as_struct_array(array)?,
+ ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) =>
struct_array.as_ref(),
+ ColumnarValue::Scalar(ScalarValue::Null) => {
+ return Ok(ColumnarValue::Scalar(ScalarValue::UInt32(None)));
+ }
+ _ => return sedona_internal_err!("Unexpected input to ST_SRID"),
+ };
+
+ let item_array = item_crs_struct_array.column(0);
+ let crs_string_array =
as_string_view_array(item_crs_struct_array.column(1))?;
+ let mut batch_srids = HashMap::<String, u32>::new();
+
+ if let Some(item_nulls) = item_array.nulls() {
+ for (is_valid, maybe_crs) in zip(item_nulls, crs_string_array) {
+ if !is_valid {
+ builder.append_null();
+ continue;
+ }
+
+ append_srid(maybe_crs, &mut batch_srids, &mut builder)?;
+ }
+ } else {
+ for maybe_crs in crs_string_array {
+ append_srid(maybe_crs, &mut batch_srids, &mut builder)?;
+ }
+ }
+
+ executor.finish(Arc::new(builder.finish()))
+ }
+}
+
+fn append_srid(
+ maybe_crs: Option<&str>,
+ batch_srids: &mut HashMap<String, u32>,
+ builder: &mut UInt32Builder,
+) -> Result<()> {
+ if let Some(crs_str) = maybe_crs {
+ if let Some(srid) = batch_srids.get(crs_str) {
+ builder.append_value(*srid);
+ } else if let Some(crs) = deserialize_crs(crs_str)? {
+ if let Some(srid) = crs.srid()? {
+ batch_srids.insert(crs_str.to_string(), srid);
+ builder.append_value(srid);
+ } else {
+ return exec_err!("Can't extract SRID from item-level CRS
'{crs_str}'");
+ }
+ } else {
+ builder.append_value(0);
+ }
+ } else {
+ builder.append_value(0);
+ }
+
+ Ok(())
+}
+
#[derive(Debug)]
struct StCrs {}
@@ -138,12 +223,10 @@ impl SedonaScalarKernel for StCrs {
args: &[ColumnarValue],
) -> Result<ColumnarValue> {
let executor = WkbExecutor::new(arg_types, args);
- let preallocate_bytes = "EPSG:4326".len() * executor.num_iterations();
- let mut builder =
- StringBuilder::with_capacity(executor.num_iterations(),
preallocate_bytes);
+ let mut builder =
StringViewBuilder::with_capacity(executor.num_iterations());
let crs_opt: Option<String> = match &arg_types[0] {
SedonaType::Wkb(_, Some(crs)) | SedonaType::WkbView(_, Some(crs))
=> {
- Some(crs.to_json())
+ Some(crs.to_authority_code()?.unwrap_or_else(|| crs.to_json()))
}
_ => None,
};
@@ -167,15 +250,75 @@ impl SedonaScalarKernel for StCrs {
}
}
+#[derive(Debug)]
+struct StCrsItemCrs {}
+
+impl SedonaScalarKernel for StCrsItemCrs {
+ fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+ let matcher = ArgMatcher::new(
+ vec![ArgMatcher::is_item_crs()],
+ SedonaType::Arrow(DataType::Utf8View),
+ );
+
+ matcher.match_args(args)
+ }
+
+ fn invoke_batch(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ let executor = WkbExecutor::new(arg_types, args);
+
+ let item_crs_struct_array = match &args[0] {
+ ColumnarValue::Array(array) => as_struct_array(array)?,
+ ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) =>
struct_array.as_ref(),
+ ColumnarValue::Scalar(ScalarValue::Null) => {
+ return Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(None)));
+ }
+ _ => return sedona_internal_err!("Unexpected input to ST_Crs"),
+ };
+
+ let item_array = item_crs_struct_array.column(0);
+ let crs_array = item_crs_struct_array.column(1);
+
+ if crs_array.null_count() == 0 && item_crs_struct_array.null_count()
== 0 {
+ return executor.finish(crs_array.clone());
+ }
+
+ // Otherwise we need to build the output. We could potentially do some
unioning
+ // of null buffers in the case where we have zero NULL crses but some
null items.
+ let item_nulls = item_array.nulls();
+ let crs_string_array = as_string_view_array(crs_array)?;
+ let mut builder =
StringViewBuilder::with_capacity(executor.num_iterations());
+
+ if let Some(item_nulls) = item_nulls {
+ for (is_valid, maybe_crs) in zip(item_nulls, crs_string_array) {
+ if is_valid {
+ builder.append_value(maybe_crs.unwrap_or("0"))
+ } else {
+ builder.append_null();
+ }
+ }
+ } else {
+ for maybe_crs in crs_string_array {
+ builder.append_value(maybe_crs.unwrap_or("0"))
+ }
+ }
+
+ executor.finish(Arc::new(builder.finish()))
+ }
+}
+
#[cfg(test)]
mod test {
use super::*;
- use arrow_array::create_array;
+ use arrow_array::{create_array, ArrayRef};
use datafusion_common::ScalarValue;
use datafusion_expr::ScalarUDF;
use sedona_schema::crs::deserialize_crs;
- use sedona_schema::datatypes::Edges;
- use sedona_testing::create::create_array;
+ use sedona_schema::datatypes::{Edges, WKB_GEOMETRY};
+ use sedona_testing::create::{create_array, create_array_item_crs,
create_scalar_item_crs};
use sedona_testing::testers::ScalarUdfTester;
#[test]
@@ -239,6 +382,56 @@ mod test {
assert!(result.unwrap_err().to_string().contains("CRS has no SRID"));
}
+ #[test]
+ fn udf_item_srid() {
+ let tester = ScalarUdfTester::new(
+ st_srid_udf().into(),
+ vec![SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()],
+ );
+ tester.assert_return_type(DataType::UInt32);
+
+ let result = tester
+ .invoke_scalar(create_scalar_item_crs(
+ Some("POINT (0 1)"),
+ None,
+ &WKB_GEOMETRY,
+ ))
+ .unwrap();
+ tester.assert_scalar_result_equals(result, 0);
+
+ let result = tester
+ .invoke_scalar(create_scalar_item_crs(
+ Some("POINT (0 1)"),
+ Some("EPSG:3857"),
+ &WKB_GEOMETRY,
+ ))
+ .unwrap();
+ tester.assert_scalar_result_equals(result, 3857);
+
+ let item_crs_array = create_array_item_crs(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ None,
+ ],
+ [
+ Some("OGC:CRS84"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ None,
+ None,
+ ],
+ &WKB_GEOMETRY,
+ );
+ let expected_srid =
+ create_array!(UInt32, [Some(4326), Some(3857), Some(3857),
Some(0), None]) as ArrayRef;
+
+ let result = tester.invoke_array(item_crs_array).unwrap();
+ assert_eq!(&result, &expected_srid);
+ }
+
#[test]
fn udf_crs() {
let udf: ScalarUDF = st_crs_udf().into();
@@ -260,21 +453,17 @@ mod test {
let crs = deserialize_crs("EPSG:4837").unwrap();
let sedona_type = SedonaType::Wkb(Edges::Planar, crs.clone());
let tester = ScalarUdfTester::new(udf.clone(),
vec![sedona_type.clone()]);
- let expected_crs = "\"EPSG:4837\"".to_string();
let result = tester
.invoke_scalar("POLYGON ((0 0, 1 0, 0 1, 0 0))")
.unwrap();
- tester.assert_scalar_result_equals(result,
ScalarValue::Utf8(Some(expected_crs.clone())));
+ tester.assert_scalar_result_equals(result, "EPSG:4837");
// Call with an array
let wkb_array = create_array(
&[Some("POINT (1 2)"), None, Some("MULTIPOINT (3 4)")],
&sedona_type,
);
- let expected = create_array!(
- Utf8,
- [Some(expected_crs.clone()), None, Some(expected_crs.clone())]
- );
+ let expected = create_array!(Utf8View, [Some("EPSG:4837"), None,
Some("EPSG:4837")]);
assert_eq!(
&tester.invoke_array(wkb_array).unwrap().as_ref(),
&expected.as_ref()
@@ -284,4 +473,62 @@ mod test {
let result = tester.invoke_scalar(ScalarValue::Null).unwrap();
tester.assert_scalar_result_equals(result, ScalarValue::Null);
}
+
+ #[test]
+ fn udf_item_crs() {
+ let tester = ScalarUdfTester::new(
+ st_crs_udf().into(),
+ vec![SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()],
+ );
+ tester.assert_return_type(DataType::Utf8View);
+
+ let result = tester
+ .invoke_scalar(create_scalar_item_crs(
+ Some("POINT (0 1)"),
+ None,
+ &WKB_GEOMETRY,
+ ))
+ .unwrap();
+ tester.assert_scalar_result_equals(result, "0");
+
+ let result = tester
+ .invoke_scalar(create_scalar_item_crs(
+ Some("POINT (0 1)"),
+ Some("EPSG:3857"),
+ &WKB_GEOMETRY,
+ ))
+ .unwrap();
+ tester.assert_scalar_result_equals(result, "EPSG:3857");
+
+ let item_crs_array = create_array_item_crs(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("POINT (4 5)"),
+ Some("POINT (6 7)"),
+ None,
+ ],
+ [
+ Some("OGC:CRS84"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ None,
+ None,
+ ],
+ &WKB_GEOMETRY,
+ );
+ let expected_crs = create_array!(
+ Utf8View,
+ [
+ Some("OGC:CRS84"),
+ Some("EPSG:3857"),
+ Some("EPSG:3857"),
+ Some("0"),
+ None
+ ]
+ ) as ArrayRef;
+
+ let result = tester.invoke_array(item_crs_array).unwrap();
+ assert_eq!(&result, &expected_crs);
+ }
}
diff --git a/rust/sedona-schema/src/datatypes.rs
b/rust/sedona-schema/src/datatypes.rs
index 85ba6ae5..4c1d5d93 100644
--- a/rust/sedona-schema/src/datatypes.rs
+++ b/rust/sedona-schema/src/datatypes.rs
@@ -86,6 +86,36 @@ static RASTER_DATATYPE: LazyLock<DataType> =
// Implementation details
impl SedonaType {
+ /// Create a new item-level CRS type
+ ///
+ /// An item level CRS type in SedonaDB is a struct(item: <arbitrary type>,
crs: <string view>).
+ /// This design was used to minimize the friction of automatically
wrapping existing functions
+ /// that accept <arbitrary type>. The crs representation is typically an
authority:code string;
+ /// however, any string that works with [deserialize_crs] is valid. A
"missing" CRS (i.e.,
+ /// `Crs::None` at the type level) is represented by a null value in the
crs array.
+ ///
+ /// Note that this function strips CRSes from item if they are present.
This is to prevent the
+ /// item-level CRS type from carrying a CRS itself.
+ pub fn new_item_crs(item: &SedonaType) -> Result<SedonaType> {
+ let item_sedona_type = match item {
+ SedonaType::Wkb(edges, _) => SedonaType::Wkb(*edges, None),
+ SedonaType::WkbView(edges, _) => SedonaType::WkbView(*edges, None),
+ _ => {
+ return sedona_internal_err!("Can't create item_crs from
non-geo type");
+ }
+ };
+
+ let arrow_type = DataType::Struct(
+ vec![
+ item_sedona_type.to_storage_field("item", true)?,
+ Field::new("crs", DataType::Utf8View, true),
+ ]
+ .into(),
+ );
+
+ Ok(SedonaType::Arrow(arrow_type))
+ }
+
/// Given a field as it would appear in an external Schema return the
appropriate SedonaType
pub fn from_storage_field(field: &Field) -> Result<SedonaType> {
match ExtensionType::from_field(field) {
diff --git a/rust/sedona-schema/src/matchers.rs
b/rust/sedona-schema/src/matchers.rs
index ca7536cb..d1a466f5 100644
--- a/rust/sedona-schema/src/matchers.rs
+++ b/rust/sedona-schema/src/matchers.rs
@@ -173,6 +173,11 @@ impl ArgMatcher {
Arc::new(IsGeography {})
}
+ /// Matches any argument that is an item-level Crs type
+ pub fn is_item_crs() -> Arc<dyn TypeMatcher + Send + Sync> {
+ Arc::new(IsItemCrs {})
+ }
+
/// Matches any raster argument
pub fn is_raster() -> Arc<dyn TypeMatcher + Send + Sync> {
Self::is_exact(RASTER)
@@ -350,6 +355,24 @@ impl TypeMatcher for IsGeography {
}
}
+#[derive(Debug)]
+struct IsItemCrs {}
+
+impl TypeMatcher for IsItemCrs {
+ fn match_type(&self, arg: &SedonaType) -> bool {
+ if let SedonaType::Arrow(DataType::Struct(fields)) = arg {
+ let field_names = fields.iter().map(|f|
f.name()).collect::<Vec<_>>();
+ if field_names != ["item", "crs"] {
+ return false;
+ }
+
+ return true;
+ }
+
+ false
+ }
+}
+
#[derive(Debug)]
struct IsNumeric {}
diff --git a/rust/sedona-testing/src/create.rs
b/rust/sedona-testing/src/create.rs
index 9d785d69..07724a63 100644
--- a/rust/sedona-testing/src/create.rs
+++ b/rust/sedona-testing/src/create.rs
@@ -16,7 +16,8 @@
// under the License.
use std::{str::FromStr, sync::Arc};
-use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray};
+use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray, StringViewArray,
StructArray};
+use arrow_schema::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use sedona_schema::datatypes::SedonaType;
@@ -53,26 +54,90 @@ pub fn create_array(wkt_values: &[Option<&str>], data_type:
&SedonaType) -> Arra
/// Create the storage [`ArrayRef`] from a sequence of WKT literals
///
-/// Panics on invalid WKT or unsupported data type.
+/// Panics on invalid WKT or unsupported data type. Supports Item CRS
+/// types; however, sets the CRS to Null
pub fn create_array_storage(wkt_values: &[Option<&str>], data_type:
&SedonaType) -> ArrayRef {
match data_type {
SedonaType::Wkb(_, _) =>
Arc::new(make_wkb_array::<BinaryArray>(wkt_values)),
SedonaType::WkbView(_, _) =>
Arc::new(make_wkb_array::<BinaryViewArray>(wkt_values)),
+ SedonaType::Arrow(DataType::Struct(fields))
+ if fields.iter().map(|f| f.name()).collect::<Vec<_>>() ==
vec!["item", "crs"] =>
+ {
+ let item_type =
SedonaType::from_storage_field(&fields[0]).unwrap();
+ create_array_item_crs(wkt_values, (0..wkt_values.len()).map(|_|
None), &item_type)
+ }
_ => panic!("create_array_storage not implemented for {data_type:?}"),
}
}
-/// Create the storage [`ScalarValue`] from a WKT literal
+/// Create the storage [`ArrayRef`] from a sequence of WKT literals
///
/// Panics on invalid WKT or unsupported data type.
+pub fn create_array_item_crs<'a>(
+ wkt_values: &[Option<&str>],
+ crs: impl IntoIterator<Item = Option<&'a str>>,
+ item_type: &SedonaType,
+) -> ArrayRef {
+ let out_fields = vec![
+ item_type.to_storage_field("item", true).unwrap(),
+ Field::new("crs", DataType::Utf8View, true),
+ ];
+
+ let item_array = create_array_storage(wkt_values, item_type);
+ let crs_array = Arc::new(crs.into_iter().collect::<StringViewArray>());
+ let nulls = item_array.nulls().cloned();
+ Arc::new(StructArray::new(
+ out_fields.into(),
+ vec![item_array, crs_array],
+ nulls,
+ ))
+}
+
+/// Create the storage [`ScalarValue`] from a WKT literal
+///
+/// Panics on invalid WKT or unsupported data type. Item CRS values
+/// are created with a Null CRS: use [create_scalar_item_crs] to explicitly
+/// create Item CRS scalars with a specific CRS.
pub fn create_scalar_storage(wkt_value: Option<&str>, data_type: &SedonaType)
-> ScalarValue {
match data_type {
SedonaType::Wkb(_, _) => ScalarValue::Binary(wkt_value.map(make_wkb)),
SedonaType::WkbView(_, _) =>
ScalarValue::BinaryView(wkt_value.map(make_wkb)),
+ SedonaType::Arrow(DataType::Struct(fields))
+ if fields.iter().map(|f| f.name()).collect::<Vec<_>>() ==
vec!["item", "crs"] =>
+ {
+ let item_type =
SedonaType::from_storage_field(&fields[0]).unwrap();
+ create_scalar_item_crs(wkt_value, None, &item_type)
+ }
_ => panic!("create_scalar_storage not implemented for {data_type:?}"),
}
}
+/// Create a [`ScalarValue`] of an item_crs array from a WKT literal
+///
+/// Panics on invalid WKT or unsupported data type.
+pub fn create_scalar_item_crs(
+ wkt_value: Option<&str>,
+ crs: Option<&str>,
+ item_type: &SedonaType,
+) -> ScalarValue {
+ let out_fields = vec![
+ item_type.to_storage_field("item", true).unwrap(),
+ Field::new("crs", DataType::Utf8View, true),
+ ];
+
+ let storage_item = create_scalar_storage(wkt_value, item_type)
+ .to_array()
+ .unwrap();
+ let storage_crs = ScalarValue::Utf8View(crs.map(|item| item.to_string()))
+ .to_array()
+ .unwrap();
+ let nulls = storage_item.nulls().cloned();
+ let item_crs_array =
+ StructArray::try_new(out_fields.into(), vec![storage_item,
storage_crs], nulls).unwrap();
+
+ ScalarValue::Struct(Arc::new(item_crs_array))
+}
+
fn make_wkb_array<T>(wkt_values: &[Option<&str>]) -> T
where
T: FromIterator<Option<Vec<u8>>>,
diff --git a/rust/sedona-testing/src/testers.rs
b/rust/sedona-testing/src/testers.rs
index fdded3b3..89cdd27f 100644
--- a/rust/sedona-testing/src/testers.rs
+++ b/rust/sedona-testing/src/testers.rs
@@ -17,7 +17,7 @@
use std::{iter::zip, sync::Arc};
use arrow_array::{ArrayRef, RecordBatch};
-use arrow_schema::{FieldRef, Schema};
+use arrow_schema::{DataType, FieldRef, Schema};
use datafusion_common::{config::ConfigOptions, Result, ScalarValue};
use datafusion_expr::{
function::{AccumulatorArgs, StateFieldsArgs},
@@ -532,10 +532,18 @@ impl ScalarUdfTester {
fn scalar_lit(arg: impl Literal, sedona_type: &SedonaType) ->
Result<ScalarValue> {
if let Expr::Literal(scalar, _) = arg.lit() {
- if matches!(
- sedona_type,
- SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _)
- ) {
+ let is_geometry_or_geography = match sedona_type {
+ SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _) => true,
+ SedonaType::Arrow(DataType::Struct(fields))
+ if fields.iter().map(|f| f.name()).collect::<Vec<_>>()
+ == vec!["item", "crs"] =>
+ {
+ true
+ }
+ _ => false,
+ };
+
+ if is_geometry_or_geography {
if let ScalarValue::Utf8(expected_wkt) = scalar {
Ok(create_scalar(expected_wkt.as_deref(), sedona_type))
} else if &scalar.data_type() == sedona_type.storage_type() {