This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new e0e1d109 feat(sedona-expr,sedona-schema): Implement item-level CRS 
(#410)
e0e1d109 is described below

commit e0e1d109480727faaf7be25923b57b4686144438
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Dec 19 15:34:50 2025 -0600

    feat(sedona-expr,sedona-schema): Implement item-level CRS (#410)
    
    Co-authored-by: Copilot <[email protected]>
---
 Cargo.lock                                         |   3 +
 python/sedonadb/tests/functions/test_transforms.py |  48 +-
 rust/sedona-expr/Cargo.toml                        |   2 +
 rust/sedona-expr/src/item_crs.rs                   | 599 +++++++++++++++++++++
 rust/sedona-expr/src/lib.rs                        |   1 +
 rust/sedona-functions/Cargo.toml                   |   1 +
 rust/sedona-functions/src/st_setsrid.rs            | 332 +++++++++++-
 rust/sedona-functions/src/st_srid.rs               | 287 +++++++++-
 rust/sedona-schema/src/datatypes.rs                |  30 ++
 rust/sedona-schema/src/matchers.rs                 |  23 +
 rust/sedona-testing/src/create.rs                  |  71 ++-
 rust/sedona-testing/src/testers.rs                 |  18 +-
 12 files changed, 1359 insertions(+), 56 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2c61044f..a79e1c52 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4871,6 +4871,8 @@ dependencies = [
 name = "sedona-expr"
 version = "0.3.0"
 dependencies = [
+ "arrow-array",
+ "arrow-buffer",
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
@@ -4905,6 +4907,7 @@ name = "sedona-functions"
 version = "0.3.0"
 dependencies = [
  "arrow-array",
+ "arrow-buffer",
  "arrow-json",
  "arrow-schema",
  "criterion",
diff --git a/python/sedonadb/tests/functions/test_transforms.py 
b/python/sedonadb/tests/functions/test_transforms.py
index ab899d1f..5ae0d20d 100644
--- a/python/sedonadb/tests/functions/test_transforms.py
+++ b/python/sedonadb/tests/functions/test_transforms.py
@@ -14,6 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import geopandas
 import pyproj
 import pytest
 from sedonadb.testing import PostGIS, SedonaDB, geom_or_null, val_or_null
@@ -82,12 +84,56 @@ def test_st_setcrs_sedonadb(eng, geom, crs, expected_srid):
     assert df.crs.to_epsg() == expected_srid
 
 
+# We haven't wired up the engines to use/support item-level CRS yet. This
+# will be easier when EWKB and/or EWKT is supported (and/or this concept
+# is supported in geoarrow.pyarrow, which we use for testing)
+def test_item_crs_sedonadb():
+    eng = SedonaDB()
+    df = geopandas.GeoDataFrame(
+        {
+            "srid": [4326, 4326, 3857, 3857, 0, 0],
+            "geometry": geopandas.GeoSeries.from_wkt(
+                [
+                    "POINT (0 1)",
+                    "POINT (2 3)",
+                    "POINT (4 5)",
+                    "POINT (6 7)",
+                    "POINT (8 9)",
+                    None,
+                ]
+            ),
+        }
+    )
+
+    eng.con.create_data_frame(df).to_view("df")
+    eng.con.sql("SELECT ST_SetSRID(geometry, srid) as item_crs FROM 
df").to_view(
+        "df_item_crs"
+    )
+
+    eng.assert_query_result(
+        "SELECT ST_SRID(item_crs) FROM df_item_crs",
+        [("4326",), ("4326",), ("3857",), ("3857",), ("0",), (None,)],
+    )
+
+    eng.assert_query_result(
+        "SELECT ST_Crs(item_crs) FROM df_item_crs",
+        [
+            ("OGC:CRS84",),
+            ("OGC:CRS84",),
+            ("EPSG:3857",),
+            ("EPSG:3857",),
+            ("0",),
+            (None,),
+        ],
+    )
+
+
 @pytest.mark.parametrize("eng", [SedonaDB])
 def test_st_crs_sedonadb(eng):
     eng = eng.create_or_skip()
     eng.assert_query_result(
         "SELECT ST_CRS(ST_SetCrs(ST_GeomFromText('POINT (1 1)'), 
'EPSG:26920'))",
-        '"EPSG:26920"',
+        "EPSG:26920",
     )
     eng.assert_query_result(
         "SELECT ST_CRS(ST_SetCrs(ST_GeomFromText('POINT (1 1)'), NULL))",
diff --git a/rust/sedona-expr/Cargo.toml b/rust/sedona-expr/Cargo.toml
index 8bfcbcf1..39c9cf56 100644
--- a/rust/sedona-expr/Cargo.toml
+++ b/rust/sedona-expr/Cargo.toml
@@ -35,6 +35,8 @@ rstest = { workspace = true }
 sedona-testing = { workspace = true }
 
 [dependencies]
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
 arrow-schema = { workspace = true }
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
diff --git a/rust/sedona-expr/src/item_crs.rs b/rust/sedona-expr/src/item_crs.rs
new file mode 100644
index 00000000..67869d9a
--- /dev/null
+++ b/rust/sedona-expr/src/item_crs.rs
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Debug, iter::zip, sync::Arc};
+
+use arrow_array::{ArrayRef, StructArray};
+use arrow_buffer::NullBuffer;
+use arrow_schema::{DataType, Field};
+use datafusion_common::{
+    cast::{as_string_view_array, as_struct_array},
+    DataFusionError, Result, ScalarValue,
+};
+use datafusion_expr::ColumnarValue;
+use sedona_common::sedona_internal_err;
+use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType, 
matchers::ArgMatcher};
+
+use crate::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
+
+/// Wrap a [SedonaScalarKernel] to provide Item CRS type support
+///
+/// Most kernels that operate on geometry or geography in some way
+/// can also support Item CRS inputs:
+///
+/// - Functions that return a non-spatial type whose value does not
+///   depend on the input CRS only need to operate on the `item` portion
+///   of any item_crs input (e.g., ST_Area()).
+/// - Functions that accept two or more spatial arguments must have
+///   compatible CRSes.
+/// - Functions that return a geometry or geography must also return
+///   an item_crs type where the output CRSes are propagated from the
+///   input.
+///
+/// This kernel provides an automatic wrapper enforcing these rules.
+/// It is appropriate for most functions except:
+///
+/// - Functions whose return value depends on the CRS
+/// - Functions whose return value depends on the value of a scalar
+///   argument
+/// - Functions whose return CRS is not strictly propagated from the
+///   CRSes of the arguments.
+#[derive(Debug)]
+pub struct ItemCrsKernel {
+    inner: ScalarKernelRef,
+}
+
+impl ItemCrsKernel {
+    pub fn new_ref(inner: ScalarKernelRef) -> ScalarKernelRef {
+        Arc::new(Self { inner })
+    }
+}
+
+impl SedonaScalarKernel for ItemCrsKernel {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        return_type_handle_item_crs(self.inner.as_ref(), args)
+    }
+
+    fn invoke_batch_from_args(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+        return_type: &SedonaType,
+        num_rows: usize,
+    ) -> Result<ColumnarValue> {
+        invoke_handle_item_crs(self.inner.as_ref(), arg_types, args, 
return_type, num_rows)
+    }
+
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        _args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        sedona_internal_err!("Should not be called because 
invoke_batch_from_args() is implemented")
+    }
+}
+
+/// Calculate a return type based on the underlying kernel
+///
+/// This function extracts the item portion of any item_crs input and
+/// passes the result to the underlying kernel's return type implementation.
+/// If the underlying kernel is going to return a geometry or geography type,
+/// we wrap it in an item_crs type.
+///
+/// This function does not pass on input scalars, because those types of
+/// functions as used in SedonaDB typically assign a type-level CRS.
+/// Functions that use scalar inputs to calculate an output type need
+/// to implement an item_crs implementation themselves.
+fn return_type_handle_item_crs(
+    kernel: &dyn SedonaScalarKernel,
+    arg_types: &[SedonaType],
+) -> Result<Option<SedonaType>> {
+    let item_crs_matcher = ArgMatcher::is_item_crs();
+
+    // If there are no item_crs arguments, this kernel never applies.
+    if !arg_types
+        .iter()
+        .any(|arg_type| item_crs_matcher.match_type(arg_type))
+    {
+        return Ok(None);
+    }
+
+    // Extract the item types. This also strips the type-level CRS for any non 
item-crs
+    // type, because any resulting geometry type should be CRS free.
+    let item_arg_types = arg_types
+        .iter()
+        .map(|arg_type| 
parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
+        .collect::<Result<Vec<_>>>()?;
+
+    // Any kernel that uses scalars to determine the output type is spurious 
here, so we
+    // pretend that there aren't any for the purposes of computing the type.
+    let scalar_args_none = (0..arg_types.len())
+        .map(|_| None)
+        .collect::<Vec<Option<&ScalarValue>>>();
+
+    // If the wrapped kernel matches and returns a geometry type, that 
geometry type will be an
+    // item/crs type. The new_item_crs() function handles stripping any CRS 
that might be present
+    // in the output type.
+    if let Some(item_type) =
+        kernel.return_type_from_args_and_scalars(&item_arg_types, 
&scalar_args_none)?
+    {
+        let geo_matcher = ArgMatcher::is_geometry_or_geography();
+        if geo_matcher.match_type(&item_type) {
+            Ok(Some(SedonaType::new_item_crs(&item_type)?))
+        } else {
+            Ok(Some(item_type))
+        }
+    } else {
+        Ok(None)
+    }
+}
+
+/// Execute an underlying kernel
+///
+/// This function handles invoking the underlying kernel, which operates
+/// only on the `item` portion of the `item_crs` type. Before executing,
+/// this function handles ensuring that all CRSes are compatible, and,
+/// if necessary, wrap a geometry or geography output in an item_crs
+/// type.
+fn invoke_handle_item_crs(
+    kernel: &dyn SedonaScalarKernel,
+    arg_types: &[SedonaType],
+    args: &[ColumnarValue],
+    return_type: &SedonaType,
+    num_rows: usize,
+) -> Result<ColumnarValue> {
+    // Separate the argument types into item and Option<crs>
+    // Don't strip the CRSes because we need them to compare with
+    // the item-level CRSes to ensure they are equal.
+    let arg_types_unwrapped = arg_types
+        .iter()
+        .map(parse_item_crs_arg_type)
+        .collect::<Result<Vec<_>>>()?;
+
+    let args_unwrapped = zip(&arg_types_unwrapped, args)
+        .map(|(arg_type, arg)| {
+            let (item_type, crs_type) = arg_type;
+            parse_item_crs_arg(item_type, crs_type, arg)
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    let crs_args = args_unwrapped
+        .iter()
+        .flat_map(|(_, crs_arg)| crs_arg)
+        .collect::<Vec<_>>();
+
+    let crs_result = ensure_crs_args_equal(&crs_args)?;
+
+    let item_types = arg_types_unwrapped
+        .iter()
+        .map(|(item_type, _)| item_type.clone())
+        .collect::<Vec<_>>();
+    let item_args = args_unwrapped
+        .iter()
+        .map(|(item_arg, _)| item_arg.clone())
+        .collect::<Vec<_>>();
+
+    let item_arg_types_no_crs = arg_types
+        .iter()
+        .map(|arg_type| 
parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, _)| item_type))
+        .collect::<Result<Vec<_>>>()?;
+    let out_item_type = match kernel.return_type(&item_arg_types_no_crs)? {
+        Some(matched_item_type) => matched_item_type,
+        None => return sedona_internal_err!("Expected inner kernel to match 
types {item_types:?}"),
+    };
+
+    let item_result =
+        kernel.invoke_batch_from_args(&item_types, &item_args, return_type, 
num_rows)?;
+
+    if ArgMatcher::is_geometry_or_geography().match_type(&out_item_type) {
+        make_item_crs(&out_item_type, item_result, crs_result, None)
+    } else {
+        Ok(item_result)
+    }
+}
+
+/// Create a new item_crs struct [ColumnarValue]
+///
+/// Optionally provide extra nulls (e.g., to satisfy a scalar function contract
+/// where null inputs -> null outputs).
+pub fn make_item_crs(
+    item_type: &SedonaType,
+    item_result: ColumnarValue,
+    crs_result: &ColumnarValue,
+    extra_nulls: Option<&NullBuffer>,
+) -> Result<ColumnarValue> {
+    let out_fields = vec![
+        item_type.to_storage_field("item", true)?,
+        Field::new("crs", DataType::Utf8View, true),
+    ];
+
+    let scalar_result = matches!(
+        (&item_result, crs_result),
+        (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
+    );
+
+    let item_crs_arrays = ColumnarValue::values_to_arrays(&[item_result, 
crs_result.clone()])?;
+    let item_array = &item_crs_arrays[0];
+    let crs_array = &item_crs_arrays[1];
+    let nulls = NullBuffer::union(item_array.nulls(), extra_nulls);
+
+    let item_crs_array = StructArray::new(
+        out_fields.into(),
+        vec![item_array.clone(), crs_array.clone()],
+        nulls,
+    );
+
+    if scalar_result {
+        Ok(ScalarValue::Struct(Arc::new(item_crs_array)).into())
+    } else {
+        Ok(ColumnarValue::Array(Arc::new(item_crs_array)))
+    }
+}
+
+/// Given an input type, separate it into an item and crs type (if the input
+/// is an item_crs type). Otherwise, just return the item type as is.
+fn parse_item_crs_arg_type(sedona_type: &SedonaType) -> Result<(SedonaType, 
Option<SedonaType>)> {
+    if let SedonaType::Arrow(DataType::Struct(fields)) = sedona_type {
+        let field_names = fields.iter().map(|f| f.name()).collect::<Vec<_>>();
+        if field_names != ["item", "crs"] {
+            return Ok((sedona_type.clone(), None));
+        }
+
+        let item = SedonaType::from_storage_field(&fields[0])?;
+        let crs = SedonaType::from_storage_field(&fields[1])?;
+        Ok((item, Some(crs)))
+    } else {
+        Ok((sedona_type.clone(), None))
+    }
+}
+
+/// Given an input type, separate it into an item and crs type (if the input
+/// is an item_crs type). Otherwise, just return the item type as is. This
+/// version strips the CRS, which we need to do here before passing it to the
+/// underlying kernel (which expects all input CRSes to match).
+fn parse_item_crs_arg_type_strip_crs(
+    sedona_type: &SedonaType,
+) -> Result<(SedonaType, Option<SedonaType>)> {
+    match sedona_type {
+        SedonaType::Wkb(edges, _) => Ok((SedonaType::Wkb(*edges, None), None)),
+        SedonaType::WkbView(edges, _) => Ok((SedonaType::WkbView(*edges, 
None), None)),
+        SedonaType::Arrow(DataType::Struct(fields))
+            if fields.iter().map(|f| f.name()).collect::<Vec<_>>() == 
vec!["item", "crs"] =>
+        {
+            let item = SedonaType::from_storage_field(&fields[0])?;
+            let crs = SedonaType::from_storage_field(&fields[1])?;
+            Ok((item, Some(crs)))
+        }
+        other => Ok((other.clone(), None)),
+    }
+}
+
+/// Separate an argument into the item and its crs (if applicable). This
+/// operates on the result of parse_item_crs_arg_type().
+fn parse_item_crs_arg(
+    item_type: &SedonaType,
+    crs_type: &Option<SedonaType>,
+    arg: &ColumnarValue,
+) -> Result<(ColumnarValue, Option<ColumnarValue>)> {
+    if crs_type.is_some() {
+        return match arg {
+            ColumnarValue::Array(array) => {
+                let struct_array = as_struct_array(array)?;
+                Ok((
+                    ColumnarValue::Array(struct_array.column(0).clone()),
+                    Some(ColumnarValue::Array(struct_array.column(1).clone())),
+                ))
+            }
+            ColumnarValue::Scalar(scalar_value) => {
+                if let ScalarValue::Struct(struct_array) = scalar_value {
+                    let item_scalar = 
ScalarValue::try_from_array(struct_array.column(0), 0)?;
+                    let crs_scalar = 
ScalarValue::try_from_array(struct_array.column(1), 0)?;
+                    Ok((
+                        ColumnarValue::Scalar(item_scalar),
+                        Some(ColumnarValue::Scalar(crs_scalar)),
+                    ))
+                } else {
+                    sedona_internal_err!(
+                        "Expected struct scalar for item_crs but got {}",
+                        scalar_value
+                    )
+                }
+            }
+        };
+    }
+
+    match item_type {
+        SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => {
+            let crs_scalar = if let Some(crs) = crs {
+                if let Some(auth_code) = crs.to_authority_code()? {
+                    ScalarValue::Utf8View(Some(auth_code))
+                } else {
+                    ScalarValue::Utf8View(Some(crs.to_json()))
+                }
+            } else {
+                ScalarValue::Utf8View(None)
+            };
+
+            Ok((arg.clone(), Some(ColumnarValue::Scalar(crs_scalar))))
+        }
+        _ => Ok((arg.clone(), None)),
+    }
+}
+
+/// Ensures values representing CRSes all represent equivalent CRS values
+fn ensure_crs_args_equal<'a>(crs_args: &[&'a ColumnarValue]) -> Result<&'a 
ColumnarValue> {
+    match crs_args.len() {
+        0 => sedona_internal_err!("Zero CRS arguments as input to item_crs"),
+        1 => Ok(crs_args[0]),
+        _ => {
+            let crs_args_string = crs_args
+                .iter()
+                .map(|arg| arg.cast_to(&DataType::Utf8View, None))
+                .collect::<Result<Vec<_>>>()?;
+            let crs_arrays = 
ColumnarValue::values_to_arrays(&crs_args_string)?;
+            for i in 1..crs_arrays.len() {
+                ensure_crs_string_arrays_equal2(&crs_arrays[i - 1], 
&crs_arrays[i])?
+            }
+
+            Ok(crs_args[0])
+        }
+    }
+}
+
+// Checks two string view arrays for equality when each represents a string 
representation
+// of a CRS
+fn ensure_crs_string_arrays_equal2(lhs: &ArrayRef, rhs: &ArrayRef) -> 
Result<()> {
+    for (lhs_item, rhs_item) in zip(as_string_view_array(lhs)?, 
as_string_view_array(rhs)?) {
+        if lhs_item == rhs_item {
+            // First check for byte-for-byte equality (faster and most likely)
+            continue;
+        }
+
+        // Check the deserialized CRS values for equality
+        if let (Some(lhs_item_str), Some(rhs_item_str)) = (lhs_item, rhs_item) 
{
+            let lhs_crs = deserialize_crs(lhs_item_str)?;
+            let rhs_crs = deserialize_crs(rhs_item_str)?;
+            if lhs_crs == rhs_crs {
+                continue;
+            }
+        }
+
+        if lhs_item != rhs_item {
+            return Err(DataFusionError::Execution(format!(
+                "CRS values not equal: {lhs_item:?} vs {rhs_item:?}",
+            )));
+        }
+    }
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod test {
+    use datafusion_expr::ScalarUDF;
+    use rstest::rstest;
+    use sedona_schema::{
+        crs::lnglat,
+        datatypes::{Edges, SedonaType, WKB_GEOMETRY},
+    };
+    use sedona_testing::{
+        create::create_array_item_crs, create::create_scalar_item_crs, 
testers::ScalarUdfTester,
+    };
+
+    use crate::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
+
+    use super::*;
+
+    // A test function of something + geometry -> out_type
+    fn test_udf(out_type: SedonaType) -> ScalarUDF {
+        let geom_to_geom_kernel = SimpleSedonaScalarKernel::new_ref(
+            ArgMatcher::new(
+                vec![ArgMatcher::is_any(), ArgMatcher::is_geometry()],
+                out_type,
+            ),
+            Arc::new(|_arg_types, args| Ok(args[0].clone())),
+        );
+
+        let crsified_kernel = ItemCrsKernel::new_ref(geom_to_geom_kernel);
+        SedonaScalarUDF::from_kernel("fun", crsified_kernel.clone()).into()
+    }
+
+    fn basic_item_crs_type() -> SedonaType {
+        SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()
+    }
+
+    #[test]
+    fn item_crs_kernel_no_match() {
+        // A call with geometry + geometry should fail (this case would be 
handled by the
+        // original kernel, not the item_crs kernel)
+        let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), 
vec![WKB_GEOMETRY, WKB_GEOMETRY]);
+        let err = tester.return_type().unwrap_err();
+        assert_eq!(
+            err.message(),
+            "fun([Wkb(Planar, None), Wkb(Planar, None)]): No kernel matching 
arguments"
+        );
+    }
+
+    #[rstest]
+    fn item_crs_kernel_basic(
+        #[values(
+            (WKB_GEOMETRY, basic_item_crs_type()),
+            (basic_item_crs_type(), WKB_GEOMETRY),
+            (basic_item_crs_type(), basic_item_crs_type())
+        )]
+        arg_types: (SedonaType, SedonaType),
+    ) {
+        // A call with geometry + item_crs or both item_crs should return 
item_crs
+        let tester = ScalarUdfTester::new(test_udf(WKB_GEOMETRY), 
vec![arg_types.0, arg_types.1]);
+        tester.assert_return_type(basic_item_crs_type());
+        let result = tester
+            .invoke_scalar_scalar("POINT (0 1)", "POINT (1 2)")
+            .unwrap();
+        assert_eq!(
+            result,
+            create_scalar_item_crs(Some("POINT (0 1)"), None, &WKB_GEOMETRY)
+        );
+    }
+
+    #[test]
+    fn item_crs_kernel_crs_values() {
+        let tester = ScalarUdfTester::new(
+            test_udf(WKB_GEOMETRY),
+            vec![basic_item_crs_type(), basic_item_crs_type()],
+        );
+        tester.assert_return_type(basic_item_crs_type());
+
+        let scalar_item_crs_4326 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), 
&WKB_GEOMETRY);
+        let scalar_item_crs_crs84 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), 
&WKB_GEOMETRY);
+        let scalar_item_crs_3857 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), 
&WKB_GEOMETRY);
+
+        // Should be able to execute when both arguments have an equal
+        // (but not necessarily identical) CRS
+        let result = tester
+            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), 
scalar_item_crs_crs84.clone())
+            .unwrap();
+        assert_eq!(result, scalar_item_crs_4326);
+
+        // We should get an error when the CRSes are not compatible
+        let err = tester
+            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), 
scalar_item_crs_3857.clone())
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+        );
+    }
+
+    #[test]
+    fn item_crs_kernel_crs_types() {
+        let scalar_item_crs_4326 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), 
&WKB_GEOMETRY);
+        let scalar_item_crs_crs84 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("OGC:CRS84"), 
&WKB_GEOMETRY);
+        let scalar_item_crs_3857 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), 
&WKB_GEOMETRY);
+
+        let sedona_type_lnglat = SedonaType::Wkb(Edges::Planar, lnglat());
+        let tester = ScalarUdfTester::new(
+            test_udf(WKB_GEOMETRY),
+            vec![basic_item_crs_type(), sedona_type_lnglat.clone()],
+        );
+        tester.assert_return_type(basic_item_crs_type());
+
+        // We should be able to execute item_crs + geometry when the crs 
compares equal
+        let result = tester
+            .invoke_scalar_scalar(scalar_item_crs_4326.clone(), "POINT (3 4)")
+            .unwrap();
+        assert_eq!(result, scalar_item_crs_4326);
+
+        let result = tester
+            .invoke_scalar_scalar(scalar_item_crs_crs84.clone(), "POINT (3 4)")
+            .unwrap();
+        assert_eq!(result, scalar_item_crs_crs84);
+
+        // We should get an error when the CRSes are not compatible
+        let err = tester
+            .invoke_scalar_scalar(scalar_item_crs_3857.clone(), "POINT (3 4)")
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: Some(\"EPSG:3857\") vs Some(\"OGC:CRS84\")"
+        );
+    }
+
+    #[test]
+    fn item_crs_kernel_arrays() {
+        let tester = ScalarUdfTester::new(
+            test_udf(WKB_GEOMETRY),
+            vec![basic_item_crs_type(), basic_item_crs_type()],
+        );
+
+        let array_item_crs_lnglat = create_array_item_crs(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("POINT (3 4)"),
+            ],
+            [Some("EPSG:4326"), Some("EPSG:4326"), Some("EPSG:4326")],
+            &WKB_GEOMETRY,
+        );
+        let scalar_item_crs_4326 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), 
&WKB_GEOMETRY);
+        let scalar_item_crs_3857 =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:3857"), 
&WKB_GEOMETRY);
+
+        // This should succeed when all CRS combinations are compatible
+        let result = tester
+            .invoke_array_scalar(array_item_crs_lnglat.clone(), 
scalar_item_crs_4326.clone())
+            .unwrap();
+        assert_eq!(&result, &array_item_crs_lnglat);
+
+        // This should fail otherwise
+        let err = tester
+            .invoke_array_scalar(array_item_crs_lnglat.clone(), 
scalar_item_crs_3857.clone())
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+        );
+    }
+
+    #[test]
+    fn item_crs_kernel_non_spatial_args_and_result() {
+        let scalar_item_crs =
+            create_scalar_item_crs(Some("POINT (0 1)"), Some("EPSG:4326"), 
&WKB_GEOMETRY);
+
+        let tester = ScalarUdfTester::new(
+            test_udf(SedonaType::Arrow(DataType::Int32)),
+            vec![SedonaType::Arrow(DataType::Int32), basic_item_crs_type()],
+        );
+        tester.assert_return_type(DataType::Int32);
+
+        let result = tester.invoke_scalar_scalar(1234, 
scalar_item_crs).unwrap();
+        assert_eq!(result, ScalarValue::Int32(Some(1234)))
+    }
+
+    #[test]
+    fn crs_args_equal() {
+        // Zero args
+        let err = ensure_crs_args_equal(&[]).unwrap_err();
+        assert!(err.message().contains("Zero CRS arguments"));
+
+        let crs_lnglat = 
ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:4326".to_string())));
+        let crs_also_lnglat =
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("OGC:CRS84".to_string())));
+        let crs_other = 
ColumnarValue::Scalar(ScalarValue::Utf8(Some("EPSG:3857".to_string())));
+
+        // One arg
+        let result_one_arg = ensure_crs_args_equal(&[&crs_lnglat]).unwrap();
+        assert!(std::ptr::eq(result_one_arg, &crs_lnglat));
+
+        // Two args (equal)
+        let result_two_args = ensure_crs_args_equal(&[&crs_lnglat, 
&crs_also_lnglat]).unwrap();
+        assert!(std::ptr::eq(result_two_args, &crs_lnglat));
+
+        // Two args (not equal)
+        let err = ensure_crs_args_equal(&[&crs_lnglat, 
&crs_other]).unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: Some(\"EPSG:4326\") vs Some(\"EPSG:3857\")"
+        );
+    }
+}
diff --git a/rust/sedona-expr/src/lib.rs b/rust/sedona-expr/src/lib.rs
index c200b8e5..e6f9cf78 100644
--- a/rust/sedona-expr/src/lib.rs
+++ b/rust/sedona-expr/src/lib.rs
@@ -16,6 +16,7 @@
 // under the License.
 pub mod aggregate_udf;
 pub mod function_set;
+pub mod item_crs;
 pub mod scalar_udf;
 pub mod spatial_filter;
 pub mod statistics;
diff --git a/rust/sedona-functions/Cargo.toml b/rust/sedona-functions/Cargo.toml
index 57afd240..9fb7f35f 100644
--- a/rust/sedona-functions/Cargo.toml
+++ b/rust/sedona-functions/Cargo.toml
@@ -41,6 +41,7 @@ tokio = { workspace = true, features = ["rt", "macros"] }
 [dependencies]
 arrow-schema = { workspace = true }
 arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
 geo-traits = { workspace = true }
diff --git a/rust/sedona-functions/src/st_setsrid.rs 
b/rust/sedona-functions/src/st_setsrid.rs
index 4cd0759e..5e0d4a8b 100644
--- a/rust/sedona-functions/src/st_setsrid.rs
+++ b/rust/sedona-functions/src/st_setsrid.rs
@@ -14,16 +14,30 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use std::{sync::Arc, vec};
+use std::{
+    collections::{HashMap, HashSet},
+    sync::{Arc, OnceLock},
+};
 
-use arrow_array::builder::BinaryBuilder;
+use arrow_array::{
+    builder::{BinaryBuilder, NullBufferBuilder},
+    ArrayRef, StringViewArray,
+};
+use arrow_buffer::NullBuffer;
 use arrow_schema::DataType;
-use datafusion_common::{error::Result, DataFusionError, ScalarValue};
+use datafusion_common::{
+    cast::{as_int64_array, as_string_view_array},
+    error::Result,
+    DataFusionError, ScalarValue,
+};
 use datafusion_expr::{
     scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation, 
Volatility,
 };
 use sedona_common::sedona_internal_err;
-use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel, 
SedonaScalarUDF};
+use sedona_expr::{
+    item_crs::make_item_crs,
+    scalar_udf::{ScalarKernelRef, SedonaScalarKernel, SedonaScalarUDF},
+};
 use sedona_geometry::transform::CrsEngine;
 use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType, 
matchers::ArgMatcher};
 
@@ -124,12 +138,25 @@ impl SedonaScalarKernel for STSetSRID {
         determine_return_type(args, scalar_args, self.engine.as_ref())
     }
 
-    fn invoke_batch(
+    fn invoke_batch_from_args(
         &self,
-        _arg_types: &[SedonaType],
+        arg_types: &[SedonaType],
         args: &[ColumnarValue],
+        return_type: &SedonaType,
+        _num_rows: usize,
     ) -> Result<ColumnarValue> {
-        Ok(args[0].clone())
+        let item_crs_matcher = ArgMatcher::is_item_crs();
+        if item_crs_matcher.match_type(return_type) {
+            let normalized_crs_value = normalize_crs_array(&args[1], 
self.engine.as_ref())?;
+            make_item_crs(
+                &arg_types[0],
+                args[0].clone(),
+                &ColumnarValue::Array(normalized_crs_value),
+                crs_input_nulls(&args[1]),
+            )
+        } else {
+            Ok(args[0].clone())
+        }
     }
 
     fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
@@ -137,6 +164,14 @@ impl SedonaScalarKernel for STSetSRID {
             "Should not be called because return_type_from_args_and_scalars() 
is implemented"
         )
     }
+
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        _args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        sedona_internal_err!("Should not be called because 
invoke_batch_from_args() is implemented")
+    }
 }
 
 #[derive(Debug)]
@@ -159,12 +194,25 @@ impl SedonaScalarKernel for STSetCRS {
         determine_return_type(args, scalar_args, self.engine.as_ref())
     }
 
-    fn invoke_batch(
+    fn invoke_batch_from_args(
         &self,
-        _arg_types: &[SedonaType],
+        arg_types: &[SedonaType],
         args: &[ColumnarValue],
+        return_type: &SedonaType,
+        _num_rows: usize,
     ) -> Result<ColumnarValue> {
-        Ok(args[0].clone())
+        let item_crs_matcher = ArgMatcher::is_item_crs();
+        if item_crs_matcher.match_type(return_type) {
+            let normalized_crs_value = normalize_crs_array(&args[1], 
self.engine.as_ref())?;
+            make_item_crs(
+                &arg_types[0],
+                args[0].clone(),
+                &ColumnarValue::Array(normalized_crs_value),
+                crs_input_nulls(&args[1]),
+            )
+        } else {
+            Ok(args[0].clone())
+        }
     }
 
     fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
@@ -172,24 +220,14 @@ impl SedonaScalarKernel for STSetCRS {
             "Should not be called because return_type_from_args_and_scalars() 
is implemented"
         )
     }
-}
 
-/// Validate a CRS string
-///
-/// If an engine is provided, the engine will be used to validate the CRS. If 
absent,
-/// the CRS will only be validated using the basic checks in [deserialize_crs].
-pub fn validate_crs(
-    crs: &str,
-    maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
-) -> Result<()> {
-    if let Some(engine) = maybe_engine {
-        engine
-            .as_ref()
-            .get_transform_crs_to_crs(crs, crs, None, "")
-            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        _args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        sedona_internal_err!("Should not be called because 
invoke_batch_from_args() is implemented")
     }
-
-    Ok(())
 }
 
 fn determine_return_type(
@@ -223,6 +261,8 @@ fn determine_return_type(
                 _ => {}
             }
         }
+    } else {
+        return Ok(Some(SedonaType::new_item_crs(&args[0])?));
     }
 
     sedona_internal_err!("Unexpected argument types: {}, {}", args[0], args[1])
@@ -341,10 +381,144 @@ impl SedonaScalarKernel for SRIDifiedKernel {
     }
 }
 
+static SINGLE_NULL_BUFFER: OnceLock<NullBuffer> = OnceLock::new();
+
+fn crs_input_nulls(crs_value: &ColumnarValue) -> Option<&NullBuffer> {
+    match crs_value {
+        ColumnarValue::Array(array) => array.nulls(),
+        ColumnarValue::Scalar(scalar_value) => {
+            if scalar_value.is_null() {
+                let null_buffer = SINGLE_NULL_BUFFER.get_or_init(|| {
+                    let mut builder = NullBufferBuilder::new(1);
+                    builder.append(false);
+                    builder
+                        .finish()
+                        .expect("Failed to build single null buffer")
+                });
+                Some(null_buffer)
+            } else {
+                None
+            }
+        }
+    }
+}
+
+/// Given an SRID or CRS array, compute the final crs array to put in the 
item_crs struct
+///
+/// For SRID arrays, this is `EPSG:<srid>` except for the SRID of 0 (which maps
+/// to a null value in the CRS array) and 4326 (which maps to a value of 
OGC:CRS84
+/// in the CRS array).
+///
+/// For CRS arrays of strings, this function attempts to abbreviate any 
inputs. For example,
+/// PROJJSON input will attempt to be abbreviated to authority:code if 
possible (or left
+/// as is otherwise). The special value "0" maps to a null value in the CRS 
array.
+fn normalize_crs_array(
+    crs_value: &ColumnarValue,
+    maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
+) -> Result<ArrayRef> {
+    match crs_value.data_type() {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64 => {
+            // Local cache to avoid re-validating inputs
+            let mut known_valid = HashSet::new();
+
+            let int_value = crs_value.cast_to(&DataType::Int64, None)?;
+            let int_array_ref = ColumnarValue::values_to_arrays(&[int_value])?;
+            let int_array = as_int64_array(&int_array_ref[0])?;
+            let utf8_view_array = int_array
+                .iter()
+                .map(|maybe_srid| -> Result<Option<String>> {
+                    if let Some(srid) = maybe_srid {
+                        if srid == 0 {
+                            return Ok(None);
+                        } else if srid == 4326 {
+                            return Ok(Some("OGC:CRS84".to_string()));
+                        }
+
+                        let auth_code = format!("EPSG:{srid}");
+                        if !known_valid.contains(&srid) {
+                            validate_crs(&auth_code, maybe_engine)?;
+                            known_valid.insert(srid);
+                        }
+
+                        Ok(Some(auth_code))
+                    } else {
+                        Ok(None)
+                    }
+                })
+                .collect::<Result<StringViewArray>>()?;
+
+            Ok(Arc::new(utf8_view_array))
+        }
+        _ => {
+            let mut known_abbreviated = HashMap::<String, String>::new();
+
+            let string_value = crs_value.cast_to(&DataType::Utf8View, None)?;
+            let string_array_ref = 
ColumnarValue::values_to_arrays(&[string_value])?;
+            let string_view_array = 
as_string_view_array(&string_array_ref[0])?;
+            let utf8_view_array = string_view_array
+                .iter()
+                .map(|maybe_crs| -> Result<Option<String>> {
+                    if let Some(crs_str) = maybe_crs {
+                        if crs_str == "0" {
+                            return Ok(None);
+                        }
+
+                        if let Some(abbreviated_crs) = 
known_abbreviated.get(crs_str) {
+                            Ok(Some(abbreviated_crs.clone()))
+                        } else if let Some(crs) = deserialize_crs(crs_str)? {
+                            let abbreviated_crs =
+                                if let Some(auth_code) = 
crs.to_authority_code()? {
+                                    auth_code
+                                } else {
+                                    crs_str.to_string()
+                                };
+
+                            known_abbreviated.insert(crs.to_string(), 
abbreviated_crs.clone());
+                            Ok(Some(abbreviated_crs))
+                        } else {
+                            Ok(None)
+                        }
+                    } else {
+                        Ok(None)
+                    }
+                })
+                .collect::<Result<StringViewArray>>()?;
+
+            Ok(Arc::new(utf8_view_array))
+        }
+    }
+}
+
+/// Validate a CRS string
+///
+/// If an engine is provided, the engine will be used to validate the CRS. If 
absent,
+/// the CRS will only be validated using the basic checks in [deserialize_crs].
+pub fn validate_crs(
+    crs: &str,
+    maybe_engine: Option<&Arc<dyn CrsEngine + Send + Sync>>,
+) -> Result<()> {
+    if let Some(engine) = maybe_engine {
+        engine
+            .as_ref()
+            .get_transform_crs_to_crs(crs, crs, None, "")
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod test {
     use std::rc::Rc;
 
+    use arrow_array::{create_array, ArrayRef};
     use arrow_schema::Field;
     use datafusion_common::config::ConfigOptions;
     use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF};
@@ -353,7 +527,11 @@ mod test {
         crs::lnglat,
         datatypes::{Edges, WKB_GEOMETRY},
     };
-    use sedona_testing::{compare::assert_value_equal, 
create::create_scalar_value};
+    use sedona_testing::{
+        compare::assert_value_equal,
+        create::{create_array, create_array_item_crs, create_scalar_value},
+        testers::ScalarUdfTester,
+    };
 
     use super::*;
 
@@ -461,6 +639,106 @@ mod test {
         assert_eq!(err.message(), "Unknown geometry error")
     }
 
+    #[test]
+    fn udf_item_srid() {
+        let tester = ScalarUdfTester::new(
+            st_set_srid_udf().into(),
+            vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Int32)],
+        );
+        
tester.assert_return_type(SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap());
+
+        let geometry_array = create_array(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("POINT (4 5)"),
+                Some("POINT (6 7)"),
+                Some("POINT (8 9)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let crs_array =
+            create_array!(Int32, [Some(4326), Some(3857), Some(3857), Some(0), 
None]) as ArrayRef;
+
+        let result = tester
+            .invoke_array_array(geometry_array, crs_array)
+            .unwrap();
+        assert_eq!(
+            &result,
+            &create_array_item_crs(
+                &[
+                    Some("POINT (0 1)"),
+                    Some("POINT (2 3)"),
+                    Some("POINT (4 5)"),
+                    Some("POINT (6 7)"),
+                    None,
+                ],
+                [
+                    Some("OGC:CRS84"),
+                    Some("EPSG:3857"),
+                    Some("EPSG:3857"),
+                    None,
+                    None
+                ],
+                &WKB_GEOMETRY
+            )
+        );
+    }
+
+    #[test]
+    fn udf_item_crs() {
+        let tester = ScalarUdfTester::new(
+            st_set_crs_udf().into(),
+            vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Utf8)],
+        );
+        
tester.assert_return_type(SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap());
+
+        let geometry_array = create_array(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("POINT (4 5)"),
+                Some("POINT (6 7)"),
+                Some("POINT (8 9)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let crs_array = create_array!(
+            Utf8,
+            [
+                Some("EPSG:4326"),
+                Some("EPSG:3857"),
+                Some("EPSG:3857"),
+                Some("0"),
+                None
+            ]
+        ) as ArrayRef;
+
+        let result = tester
+            .invoke_array_array(geometry_array, crs_array)
+            .unwrap();
+        assert_eq!(
+            &result,
+            &create_array_item_crs(
+                &[
+                    Some("POINT (0 1)"),
+                    Some("POINT (2 3)"),
+                    Some("POINT (4 5)"),
+                    Some("POINT (6 7)"),
+                    None
+                ],
+                [
+                    Some("OGC:CRS84"),
+                    Some("EPSG:3857"),
+                    Some("EPSG:3857"),
+                    None,
+                    None
+                ],
+                &WKB_GEOMETRY
+            )
+        );
+    }
+
     fn call_udf(
         udf: &ScalarUDF,
         arg: ColumnarValue,
diff --git a/rust/sedona-functions/src/st_srid.rs 
b/rust/sedona-functions/src/st_srid.rs
index 4175f3a2..80a22463 100644
--- a/rust/sedona-functions/src/st_srid.rs
+++ b/rust/sedona-functions/src/st_srid.rs
@@ -14,19 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 use crate::executor::WkbExecutor;
-use arrow_array::builder::StringBuilder;
-use arrow_array::builder::UInt32Builder;
-use arrow_array::Array;
+use arrow_array::{
+    builder::{StringViewBuilder, UInt32Builder},
+    Array,
+};
 use arrow_schema::DataType;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{
+    cast::{as_string_view_array, as_struct_array},
+    exec_err, DataFusionError, Result, ScalarValue,
+};
 use datafusion_expr::{
     scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation, 
Volatility,
 };
+use sedona_common::sedona_internal_err;
 use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_schema::crs::deserialize_crs;
 use sedona_schema::datatypes::SedonaType;
 use sedona_schema::matchers::ArgMatcher;
-use std::{sync::Arc, vec};
+use std::{collections::HashMap, iter::zip, sync::Arc};
 
 /// ST_Srid() scalar UDF implementation
 ///
@@ -34,7 +41,7 @@ use std::{sync::Arc, vec};
 pub fn st_srid_udf() -> SedonaScalarUDF {
     SedonaScalarUDF::new(
         "st_srid",
-        vec![Arc::new(StSrid {})],
+        vec![Arc::new(StSridItemCrs {}), Arc::new(StSrid {})],
         Volatility::Immutable,
         Some(st_srid_doc()),
     )
@@ -46,7 +53,7 @@ pub fn st_srid_udf() -> SedonaScalarUDF {
 pub fn st_crs_udf() -> SedonaScalarUDF {
     SedonaScalarUDF::new(
         "st_crs",
-        vec![Arc::new(StCrs {})],
+        vec![Arc::new(StCrsItemCrs {}), Arc::new(StCrs {})],
         Volatility::Immutable,
         Some(st_crs_doc()),
     )
@@ -119,6 +126,84 @@ impl SedonaScalarKernel for StSrid {
     }
 }
 
+#[derive(Debug)]
+struct StSridItemCrs {}
+
+impl SedonaScalarKernel for StSridItemCrs {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        let matcher = ArgMatcher::new(
+            vec![ArgMatcher::is_item_crs()],
+            SedonaType::Arrow(DataType::UInt32),
+        );
+
+        matcher.match_args(args)
+    }
+
+    fn invoke_batch(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        let executor = WkbExecutor::new(arg_types, args);
+        let mut builder = 
UInt32Builder::with_capacity(executor.num_iterations());
+
+        let item_crs_struct_array = match &args[0] {
+            ColumnarValue::Array(array) => as_struct_array(array)?,
+            ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => 
struct_array.as_ref(),
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                return Ok(ColumnarValue::Scalar(ScalarValue::UInt32(None)));
+            }
+            _ => return sedona_internal_err!("Unexpected input to ST_SRID"),
+        };
+
+        let item_array = item_crs_struct_array.column(0);
+        let crs_string_array = 
as_string_view_array(item_crs_struct_array.column(1))?;
+        let mut batch_srids = HashMap::<String, u32>::new();
+
+        if let Some(item_nulls) = item_array.nulls() {
+            for (is_valid, maybe_crs) in zip(item_nulls, crs_string_array) {
+                if !is_valid {
+                    builder.append_null();
+                    continue;
+                }
+
+                append_srid(maybe_crs, &mut batch_srids, &mut builder)?;
+            }
+        } else {
+            for maybe_crs in crs_string_array {
+                append_srid(maybe_crs, &mut batch_srids, &mut builder)?;
+            }
+        }
+
+        executor.finish(Arc::new(builder.finish()))
+    }
+}
+
+fn append_srid(
+    maybe_crs: Option<&str>,
+    batch_srids: &mut HashMap<String, u32>,
+    builder: &mut UInt32Builder,
+) -> Result<()> {
+    if let Some(crs_str) = maybe_crs {
+        if let Some(srid) = batch_srids.get(crs_str) {
+            builder.append_value(*srid);
+        } else if let Some(crs) = deserialize_crs(crs_str)? {
+            if let Some(srid) = crs.srid()? {
+                batch_srids.insert(crs_str.to_string(), srid);
+                builder.append_value(srid);
+            } else {
+                return exec_err!("Can't extract SRID from item-level CRS 
'{crs_str}'");
+            }
+        } else {
+            builder.append_value(0);
+        }
+    } else {
+        builder.append_value(0);
+    }
+
+    Ok(())
+}
+
 #[derive(Debug)]
 struct StCrs {}
 
@@ -138,12 +223,10 @@ impl SedonaScalarKernel for StCrs {
         args: &[ColumnarValue],
     ) -> Result<ColumnarValue> {
         let executor = WkbExecutor::new(arg_types, args);
-        let preallocate_bytes = "EPSG:4326".len() * executor.num_iterations();
-        let mut builder =
-            StringBuilder::with_capacity(executor.num_iterations(), 
preallocate_bytes);
+        let mut builder = 
StringViewBuilder::with_capacity(executor.num_iterations());
         let crs_opt: Option<String> = match &arg_types[0] {
             SedonaType::Wkb(_, Some(crs)) | SedonaType::WkbView(_, Some(crs)) 
=> {
-                Some(crs.to_json())
+                Some(crs.to_authority_code()?.unwrap_or_else(|| crs.to_json()))
             }
             _ => None,
         };
@@ -167,15 +250,75 @@ impl SedonaScalarKernel for StCrs {
     }
 }
 
+#[derive(Debug)]
+struct StCrsItemCrs {}
+
+impl SedonaScalarKernel for StCrsItemCrs {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        let matcher = ArgMatcher::new(
+            vec![ArgMatcher::is_item_crs()],
+            SedonaType::Arrow(DataType::Utf8View),
+        );
+
+        matcher.match_args(args)
+    }
+
+    fn invoke_batch(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        let executor = WkbExecutor::new(arg_types, args);
+
+        let item_crs_struct_array = match &args[0] {
+            ColumnarValue::Array(array) => as_struct_array(array)?,
+            ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => 
struct_array.as_ref(),
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                return Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(None)));
+            }
+            _ => return sedona_internal_err!("Unexpected input to ST_Crs"),
+        };
+
+        let item_array = item_crs_struct_array.column(0);
+        let crs_array = item_crs_struct_array.column(1);
+
+        if crs_array.null_count() == 0 && item_crs_struct_array.null_count() 
== 0 {
+            return executor.finish(crs_array.clone());
+        }
+
+        // Otherwise we need to build the output. We could potentially do some 
unioning
+        // of null buffers in the case where we have zero NULL crses but some 
null items.
+        let item_nulls = item_array.nulls();
+        let crs_string_array = as_string_view_array(crs_array)?;
+        let mut builder = 
StringViewBuilder::with_capacity(executor.num_iterations());
+
+        if let Some(item_nulls) = item_nulls {
+            for (is_valid, maybe_crs) in zip(item_nulls, crs_string_array) {
+                if is_valid {
+                    builder.append_value(maybe_crs.unwrap_or("0"))
+                } else {
+                    builder.append_null();
+                }
+            }
+        } else {
+            for maybe_crs in crs_string_array {
+                builder.append_value(maybe_crs.unwrap_or("0"))
+            }
+        }
+
+        executor.finish(Arc::new(builder.finish()))
+    }
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
-    use arrow_array::create_array;
+    use arrow_array::{create_array, ArrayRef};
     use datafusion_common::ScalarValue;
     use datafusion_expr::ScalarUDF;
     use sedona_schema::crs::deserialize_crs;
-    use sedona_schema::datatypes::Edges;
-    use sedona_testing::create::create_array;
+    use sedona_schema::datatypes::{Edges, WKB_GEOMETRY};
+    use sedona_testing::create::{create_array, create_array_item_crs, 
create_scalar_item_crs};
     use sedona_testing::testers::ScalarUdfTester;
 
     #[test]
@@ -239,6 +382,56 @@ mod test {
         assert!(result.unwrap_err().to_string().contains("CRS has no SRID"));
     }
 
+    #[test]
+    fn udf_item_srid() {
+        let tester = ScalarUdfTester::new(
+            st_srid_udf().into(),
+            vec![SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()],
+        );
+        tester.assert_return_type(DataType::UInt32);
+
+        let result = tester
+            .invoke_scalar(create_scalar_item_crs(
+                Some("POINT (0 1)"),
+                None,
+                &WKB_GEOMETRY,
+            ))
+            .unwrap();
+        tester.assert_scalar_result_equals(result, 0);
+
+        let result = tester
+            .invoke_scalar(create_scalar_item_crs(
+                Some("POINT (0 1)"),
+                Some("EPSG:3857"),
+                &WKB_GEOMETRY,
+            ))
+            .unwrap();
+        tester.assert_scalar_result_equals(result, 3857);
+
+        let item_crs_array = create_array_item_crs(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("POINT (4 5)"),
+                Some("POINT (6 7)"),
+                None,
+            ],
+            [
+                Some("OGC:CRS84"),
+                Some("EPSG:3857"),
+                Some("EPSG:3857"),
+                None,
+                None,
+            ],
+            &WKB_GEOMETRY,
+        );
+        let expected_srid =
+            create_array!(UInt32, [Some(4326), Some(3857), Some(3857), 
Some(0), None]) as ArrayRef;
+
+        let result = tester.invoke_array(item_crs_array).unwrap();
+        assert_eq!(&result, &expected_srid);
+    }
+
     #[test]
     fn udf_crs() {
         let udf: ScalarUDF = st_crs_udf().into();
@@ -260,21 +453,17 @@ mod test {
         let crs = deserialize_crs("EPSG:4837").unwrap();
         let sedona_type = SedonaType::Wkb(Edges::Planar, crs.clone());
         let tester = ScalarUdfTester::new(udf.clone(), 
vec![sedona_type.clone()]);
-        let expected_crs = "\"EPSG:4837\"".to_string();
         let result = tester
             .invoke_scalar("POLYGON ((0 0, 1 0, 0 1, 0 0))")
             .unwrap();
-        tester.assert_scalar_result_equals(result, 
ScalarValue::Utf8(Some(expected_crs.clone())));
+        tester.assert_scalar_result_equals(result, "EPSG:4837");
 
         // Call with an array
         let wkb_array = create_array(
             &[Some("POINT (1 2)"), None, Some("MULTIPOINT (3 4)")],
             &sedona_type,
         );
-        let expected = create_array!(
-            Utf8,
-            [Some(expected_crs.clone()), None, Some(expected_crs.clone())]
-        );
+        let expected = create_array!(Utf8View, [Some("EPSG:4837"), None, 
Some("EPSG:4837")]);
         assert_eq!(
             &tester.invoke_array(wkb_array).unwrap().as_ref(),
             &expected.as_ref()
@@ -284,4 +473,62 @@ mod test {
         let result = tester.invoke_scalar(ScalarValue::Null).unwrap();
         tester.assert_scalar_result_equals(result, ScalarValue::Null);
     }
+
+    #[test]
+    fn udf_item_crs() {
+        let tester = ScalarUdfTester::new(
+            st_crs_udf().into(),
+            vec![SedonaType::new_item_crs(&WKB_GEOMETRY).unwrap()],
+        );
+        tester.assert_return_type(DataType::Utf8View);
+
+        let result = tester
+            .invoke_scalar(create_scalar_item_crs(
+                Some("POINT (0 1)"),
+                None,
+                &WKB_GEOMETRY,
+            ))
+            .unwrap();
+        tester.assert_scalar_result_equals(result, "0");
+
+        let result = tester
+            .invoke_scalar(create_scalar_item_crs(
+                Some("POINT (0 1)"),
+                Some("EPSG:3857"),
+                &WKB_GEOMETRY,
+            ))
+            .unwrap();
+        tester.assert_scalar_result_equals(result, "EPSG:3857");
+
+        let item_crs_array = create_array_item_crs(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("POINT (4 5)"),
+                Some("POINT (6 7)"),
+                None,
+            ],
+            [
+                Some("OGC:CRS84"),
+                Some("EPSG:3857"),
+                Some("EPSG:3857"),
+                None,
+                None,
+            ],
+            &WKB_GEOMETRY,
+        );
+        let expected_crs = create_array!(
+            Utf8View,
+            [
+                Some("OGC:CRS84"),
+                Some("EPSG:3857"),
+                Some("EPSG:3857"),
+                Some("0"),
+                None
+            ]
+        ) as ArrayRef;
+
+        let result = tester.invoke_array(item_crs_array).unwrap();
+        assert_eq!(&result, &expected_crs);
+    }
 }
diff --git a/rust/sedona-schema/src/datatypes.rs 
b/rust/sedona-schema/src/datatypes.rs
index 85ba6ae5..4c1d5d93 100644
--- a/rust/sedona-schema/src/datatypes.rs
+++ b/rust/sedona-schema/src/datatypes.rs
@@ -86,6 +86,36 @@ static RASTER_DATATYPE: LazyLock<DataType> =
 // Implementation details
 
 impl SedonaType {
+    /// Create a new item-level CRS type
+    ///
+    /// An item level CRS type in SedonaDB is a struct(item: <arbitrary type>, 
crs: <string view>).
+    /// This design was used to minimize the friction of automatically 
wrapping existing functions
+    /// that accept <arbitrary type>. The crs representation is typically an 
authority:code string;
+    /// however, any string that works with [deserialize_crs] is valid. A 
"missing" CRS (i.e.,
+    /// `Crs::None` at the type level) is represented by a null value in the 
crs array.
+    ///
+    /// Note that this function strips CRSes from item if they are present. 
This is to prevent the
+    /// item-level CRS type from carrying a CRS itself.
+    pub fn new_item_crs(item: &SedonaType) -> Result<SedonaType> {
+        let item_sedona_type = match item {
+            SedonaType::Wkb(edges, _) => SedonaType::Wkb(*edges, None),
+            SedonaType::WkbView(edges, _) => SedonaType::WkbView(*edges, None),
+            _ => {
+                return sedona_internal_err!("Can't create item_crs from 
non-geo type");
+            }
+        };
+
+        let arrow_type = DataType::Struct(
+            vec![
+                item_sedona_type.to_storage_field("item", true)?,
+                Field::new("crs", DataType::Utf8View, true),
+            ]
+            .into(),
+        );
+
+        Ok(SedonaType::Arrow(arrow_type))
+    }
+
     /// Given a field as it would appear in an external Schema return the 
appropriate SedonaType
     pub fn from_storage_field(field: &Field) -> Result<SedonaType> {
         match ExtensionType::from_field(field) {
diff --git a/rust/sedona-schema/src/matchers.rs 
b/rust/sedona-schema/src/matchers.rs
index ca7536cb..d1a466f5 100644
--- a/rust/sedona-schema/src/matchers.rs
+++ b/rust/sedona-schema/src/matchers.rs
@@ -173,6 +173,11 @@ impl ArgMatcher {
         Arc::new(IsGeography {})
     }
 
+    /// Matches any argument that is an item-level Crs type
+    pub fn is_item_crs() -> Arc<dyn TypeMatcher + Send + Sync> {
+        Arc::new(IsItemCrs {})
+    }
+
     /// Matches any raster argument
     pub fn is_raster() -> Arc<dyn TypeMatcher + Send + Sync> {
         Self::is_exact(RASTER)
@@ -350,6 +355,24 @@ impl TypeMatcher for IsGeography {
     }
 }
 
+#[derive(Debug)]
+struct IsItemCrs {}
+
+impl TypeMatcher for IsItemCrs {
+    fn match_type(&self, arg: &SedonaType) -> bool {
+        if let SedonaType::Arrow(DataType::Struct(fields)) = arg {
+            let field_names = fields.iter().map(|f| 
f.name()).collect::<Vec<_>>();
+            if field_names != ["item", "crs"] {
+                return false;
+            }
+
+            return true;
+        }
+
+        false
+    }
+}
+
 #[derive(Debug)]
 struct IsNumeric {}
 
diff --git a/rust/sedona-testing/src/create.rs 
b/rust/sedona-testing/src/create.rs
index 9d785d69..07724a63 100644
--- a/rust/sedona-testing/src/create.rs
+++ b/rust/sedona-testing/src/create.rs
@@ -16,7 +16,8 @@
 // under the License.
 use std::{str::FromStr, sync::Arc};
 
-use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray};
+use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray, StringViewArray, 
StructArray};
+use arrow_schema::{DataType, Field};
 use datafusion_common::ScalarValue;
 use datafusion_expr::ColumnarValue;
 use sedona_schema::datatypes::SedonaType;
@@ -53,26 +54,90 @@ pub fn create_array(wkt_values: &[Option<&str>], data_type: 
&SedonaType) -> Arra
 
 /// Create the storage [`ArrayRef`] from a sequence of WKT literals
 ///
-/// Panics on invalid WKT or unsupported data type.
+/// Panics on invalid WKT or unsupported data type. Supports Item CRS
+/// types; however, sets the CRS to Null
 pub fn create_array_storage(wkt_values: &[Option<&str>], data_type: 
&SedonaType) -> ArrayRef {
     match data_type {
         SedonaType::Wkb(_, _) => 
Arc::new(make_wkb_array::<BinaryArray>(wkt_values)),
         SedonaType::WkbView(_, _) => 
Arc::new(make_wkb_array::<BinaryViewArray>(wkt_values)),
+        SedonaType::Arrow(DataType::Struct(fields))
+            if fields.iter().map(|f| f.name()).collect::<Vec<_>>() == 
vec!["item", "crs"] =>
+        {
+            let item_type = 
SedonaType::from_storage_field(&fields[0]).unwrap();
+            create_array_item_crs(wkt_values, (0..wkt_values.len()).map(|_| 
None), &item_type)
+        }
         _ => panic!("create_array_storage not implemented for {data_type:?}"),
     }
 }
 
-/// Create the storage [`ScalarValue`] from a WKT literal
+/// Create the storage [`ArrayRef`] from a sequence of WKT literals
 ///
 /// Panics on invalid WKT or unsupported data type.
+pub fn create_array_item_crs<'a>(
+    wkt_values: &[Option<&str>],
+    crs: impl IntoIterator<Item = Option<&'a str>>,
+    item_type: &SedonaType,
+) -> ArrayRef {
+    let out_fields = vec![
+        item_type.to_storage_field("item", true).unwrap(),
+        Field::new("crs", DataType::Utf8View, true),
+    ];
+
+    let item_array = create_array_storage(wkt_values, item_type);
+    let crs_array = Arc::new(crs.into_iter().collect::<StringViewArray>());
+    let nulls = item_array.nulls().cloned();
+    Arc::new(StructArray::new(
+        out_fields.into(),
+        vec![item_array, crs_array],
+        nulls,
+    ))
+}
+
+/// Create the storage [`ScalarValue`] from a WKT literal
+///
+/// Panics on invalid WKT or unsupported data type. Item CRS values
+/// are created with a Null CRS: use [create_scalar_item_crs] to explicitly
+/// create Item CRS scalars with a specific CRS.
 pub fn create_scalar_storage(wkt_value: Option<&str>, data_type: &SedonaType) 
-> ScalarValue {
     match data_type {
         SedonaType::Wkb(_, _) => ScalarValue::Binary(wkt_value.map(make_wkb)),
         SedonaType::WkbView(_, _) => 
ScalarValue::BinaryView(wkt_value.map(make_wkb)),
+        SedonaType::Arrow(DataType::Struct(fields))
+            if fields.iter().map(|f| f.name()).collect::<Vec<_>>() == 
vec!["item", "crs"] =>
+        {
+            let item_type = 
SedonaType::from_storage_field(&fields[0]).unwrap();
+            create_scalar_item_crs(wkt_value, None, &item_type)
+        }
         _ => panic!("create_scalar_storage not implemented for {data_type:?}"),
     }
 }
 
+/// Create a [`ScalarValue`] of an item_crs array from a WKT literal
+///
+/// Panics on invalid WKT or unsupported data type.
+pub fn create_scalar_item_crs(
+    wkt_value: Option<&str>,
+    crs: Option<&str>,
+    item_type: &SedonaType,
+) -> ScalarValue {
+    let out_fields = vec![
+        item_type.to_storage_field("item", true).unwrap(),
+        Field::new("crs", DataType::Utf8View, true),
+    ];
+
+    let storage_item = create_scalar_storage(wkt_value, item_type)
+        .to_array()
+        .unwrap();
+    let storage_crs = ScalarValue::Utf8View(crs.map(|item| item.to_string()))
+        .to_array()
+        .unwrap();
+    let nulls = storage_item.nulls().cloned();
+    let item_crs_array =
+        StructArray::try_new(out_fields.into(), vec![storage_item, 
storage_crs], nulls).unwrap();
+
+    ScalarValue::Struct(Arc::new(item_crs_array))
+}
+
 fn make_wkb_array<T>(wkt_values: &[Option<&str>]) -> T
 where
     T: FromIterator<Option<Vec<u8>>>,
diff --git a/rust/sedona-testing/src/testers.rs 
b/rust/sedona-testing/src/testers.rs
index fdded3b3..89cdd27f 100644
--- a/rust/sedona-testing/src/testers.rs
+++ b/rust/sedona-testing/src/testers.rs
@@ -17,7 +17,7 @@
 use std::{iter::zip, sync::Arc};
 
 use arrow_array::{ArrayRef, RecordBatch};
-use arrow_schema::{FieldRef, Schema};
+use arrow_schema::{DataType, FieldRef, Schema};
 use datafusion_common::{config::ConfigOptions, Result, ScalarValue};
 use datafusion_expr::{
     function::{AccumulatorArgs, StateFieldsArgs},
@@ -532,10 +532,18 @@ impl ScalarUdfTester {
 
     fn scalar_lit(arg: impl Literal, sedona_type: &SedonaType) -> 
Result<ScalarValue> {
         if let Expr::Literal(scalar, _) = arg.lit() {
-            if matches!(
-                sedona_type,
-                SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _)
-            ) {
+            let is_geometry_or_geography = match sedona_type {
+                SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _) => true,
+                SedonaType::Arrow(DataType::Struct(fields))
+                    if fields.iter().map(|f| f.name()).collect::<Vec<_>>()
+                        == vec!["item", "crs"] =>
+                {
+                    true
+                }
+                _ => false,
+            };
+
+            if is_geometry_or_geography {
                 if let ScalarValue::Utf8(expected_wkt) = scalar {
                     Ok(create_scalar(expected_wkt.as_deref(), sedona_type))
                 } else if &scalar.data_type() == sedona_type.storage_type() {

Reply via email to