This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new e106ce65 feat(rust/sedona-expr): Implement item crs wrapper around 
aggregate functions (#520)
e106ce65 is described below

commit e106ce65b6ded198298c36d87776dae878c88282
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Jan 20 09:15:15 2026 -0600

    feat(rust/sedona-expr): Implement item crs wrapper around aggregate 
functions (#520)
---
 Cargo.lock                                   |   1 +
 c/sedona-geos/src/register.rs                |   2 +-
 c/sedona-geos/src/st_polygonize_agg.rs       |  40 +++-
 rust/sedona-expr/src/aggregate_udf.rs        |  55 +++++-
 rust/sedona-expr/src/function_set.rs         |  22 ++-
 rust/sedona-expr/src/item_crs.rs             | 270 ++++++++++++++++++++++++++-
 rust/sedona-functions/src/st_analyze_agg.rs  |  10 +-
 rust/sedona-functions/src/st_collect_agg.rs  | 162 +++++++++++++++-
 rust/sedona-functions/src/st_envelope_agg.rs |  29 ++-
 rust/sedona-geo/Cargo.toml                   |   1 +
 rust/sedona-geo/src/register.rs              |   2 +-
 rust/sedona-geo/src/st_intersection_agg.rs   |  37 +++-
 rust/sedona-geo/src/st_union_agg.rs          |  41 +++-
 rust/sedona-schema/src/crs.rs                |   4 +-
 rust/sedona/src/context.rs                   |   6 +-
 15 files changed, 625 insertions(+), 57 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f99ff5ce..bafb1b7e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5122,6 +5122,7 @@ dependencies = [
  "geo-types",
  "geojson",
  "rstest",
+ "sedona-common",
  "sedona-expr",
  "sedona-functions",
  "sedona-geo-generic-alg",
diff --git a/c/sedona-geos/src/register.rs b/c/sedona-geos/src/register.rs
index 39f80676..31498288 100644
--- a/c/sedona-geos/src/register.rs
+++ b/c/sedona-geos/src/register.rs
@@ -84,7 +84,7 @@ pub fn scalar_kernels() -> Vec<(&'static str, 
Vec<ScalarKernelRef>)> {
     )
 }
 
-pub fn aggregate_kernels() -> Vec<(&'static str, SedonaAccumulatorRef)> {
+pub fn aggregate_kernels() -> Vec<(&'static str, Vec<SedonaAccumulatorRef>)> {
     define_aggregate_kernels!(
         "st_polygonize_agg" => 
crate::st_polygonize_agg::st_polygonize_agg_impl,
     )
diff --git a/c/sedona-geos/src/st_polygonize_agg.rs 
b/c/sedona-geos/src/st_polygonize_agg.rs
index 9c21f49a..acc81b93 100644
--- a/c/sedona-geos/src/st_polygonize_agg.rs
+++ b/c/sedona-geos/src/st_polygonize_agg.rs
@@ -25,7 +25,10 @@ use datafusion_expr::{Accumulator, ColumnarValue};
 use geo_traits::Dimensions;
 use geos::Geom;
 use sedona_common::sedona_internal_err;
-use sedona_expr::aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef};
+use sedona_expr::{
+    aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
+    item_crs::ItemCrsSedonaAccumulator,
+};
 use sedona_geometry::wkb_factory::write_wkb_geometrycollection_header;
 use sedona_schema::{
     datatypes::{SedonaType, WKB_GEOMETRY},
@@ -34,8 +37,8 @@ use sedona_schema::{
 use wkb::reader::read_wkb;
 
 /// ST_Polygonize_Agg() aggregate implementation using GEOS
-pub fn st_polygonize_agg_impl() -> SedonaAccumulatorRef {
-    Arc::new(STPolygonizeAgg {})
+pub fn st_polygonize_agg_impl() -> Vec<SedonaAccumulatorRef> {
+    ItemCrsSedonaAccumulator::wrap_impl(STPolygonizeAgg {})
 }
 
 #[derive(Debug)]
@@ -210,15 +213,19 @@ mod tests {
     use datafusion_expr::AggregateUDF;
     use rstest::rstest;
     use sedona_expr::aggregate_udf::SedonaAggregateUDF;
-    use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
-    use sedona_testing::{compare::assert_scalar_equal_wkb_geometry, 
testers::AggregateUdfTester};
+    use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS, 
WKB_VIEW_GEOMETRY};
+    use sedona_testing::{
+        compare::{assert_scalar_equal, assert_scalar_equal_wkb_geometry},
+        create::create_scalar_item_crs,
+        testers::AggregateUdfTester,
+    };
 
     use super::*;
 
     fn create_udf() -> SedonaAggregateUDF {
         SedonaAggregateUDF::new(
             "st_polygonize_agg",
-            vec![st_polygonize_agg_impl()],
+            st_polygonize_agg_impl(),
             datafusion_expr::Volatility::Immutable,
             None,
         )
@@ -454,4 +461,25 @@ mod tests {
         let result = tester.aggregate_wkt(batches).unwrap();
         assert_scalar_equal_wkb_geometry(&result, Some("GEOMETRYCOLLECTION 
EMPTY"));
     }
+
+    #[rstest]
+    fn udf_invoke_item_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let udf = create_udf();
+        let tester = AggregateUdfTester::new(udf.into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batches = vec![vec![
+            Some("LINESTRING (0 0, 10 0)"),
+            Some("LINESTRING (10 0, 10 10)"),
+            Some("LINESTRING (10 10, 0 0)"),
+        ]];
+        let expected = create_scalar_item_crs(
+            Some("GEOMETRYCOLLECTION (POLYGON ((10 0, 0 0, 10 10, 10 0)))"),
+            None,
+            &WKB_GEOMETRY,
+        );
+
+        assert_scalar_equal(&tester.aggregate_wkt(batches).unwrap(), 
&expected);
+    }
 }
diff --git a/rust/sedona-expr/src/aggregate_udf.rs 
b/rust/sedona-expr/src/aggregate_udf.rs
index b8625e54..445093f5 100644
--- a/rust/sedona-expr/src/aggregate_udf.rs
+++ b/rust/sedona-expr/src/aggregate_udf.rs
@@ -27,7 +27,39 @@ use sedona_schema::datatypes::SedonaType;
 
 use sedona_schema::matchers::ArgMatcher;
 
-pub type SedonaAccumulatorRef = Arc<dyn SedonaAccumulator + Send + Sync>;
+/// Shorthand for a [SedonaAccumulator] reference
+pub type SedonaAccumulatorRef = Arc<dyn SedonaAccumulator>;
+
+/// Helper to resolve an iterable of accumulators
+pub trait IntoSedonaAccumulatorRefs {
+    fn into_sedona_accumulator_refs(self) -> Vec<SedonaAccumulatorRef>;
+}
+
+impl IntoSedonaAccumulatorRefs for SedonaAccumulatorRef {
+    fn into_sedona_accumulator_refs(self) -> Vec<SedonaAccumulatorRef> {
+        vec![self]
+    }
+}
+
+impl IntoSedonaAccumulatorRefs for Vec<SedonaAccumulatorRef> {
+    fn into_sedona_accumulator_refs(self) -> Vec<SedonaAccumulatorRef> {
+        self
+    }
+}
+
+impl<T: SedonaAccumulator + 'static> IntoSedonaAccumulatorRefs for T {
+    fn into_sedona_accumulator_refs(self) -> Vec<SedonaAccumulatorRef> {
+        vec![Arc::new(self)]
+    }
+}
+
+impl<T: SedonaAccumulator + 'static> IntoSedonaAccumulatorRefs for Vec<Arc<T>> 
{
+    fn into_sedona_accumulator_refs(self) -> Vec<SedonaAccumulatorRef> {
+        self.into_iter()
+            .map(|item| item as SedonaAccumulatorRef)
+            .collect()
+    }
+}
 
 /// Top-level aggregate user-defined function
 ///
@@ -59,7 +91,7 @@ impl SedonaAggregateUDF {
     /// Create a new SedonaAggregateUDF
     pub fn new(
         name: &str,
-        kernels: Vec<SedonaAccumulatorRef>,
+        kernels: impl IntoSedonaAccumulatorRefs,
         volatility: Volatility,
         documentation: Option<Documentation>,
     ) -> Self {
@@ -67,7 +99,7 @@ impl SedonaAggregateUDF {
         Self {
             name: name.to_string(),
             signature,
-            kernels,
+            kernels: kernels.into_sedona_accumulator_refs(),
             documentation,
         }
     }
@@ -86,15 +118,17 @@ impl SedonaAggregateUDF {
         documentation: Option<Documentation>,
     ) -> Self {
         let stub_kernel = StubAccumulator::new(name.to_string(), arg_matcher);
-        Self::new(name, vec![Arc::new(stub_kernel)], volatility, documentation)
+        Self::new(name, stub_kernel, volatility, documentation)
     }
 
     /// Add a new kernel to an Aggregate UDF
     ///
     /// Because kernels are resolved in reverse order, the new kernel will take
     /// precedence over any previously added kernels that apply to the same 
types.
-    pub fn add_kernel(&mut self, kernel: SedonaAccumulatorRef) {
-        self.kernels.push(kernel);
+    pub fn add_kernel(&mut self, kernels: impl IntoSedonaAccumulatorRefs) {
+        for kernel in kernels.into_sedona_accumulator_refs() {
+            self.kernels.push(kernel);
+        }
     }
 
     // List the current kernels
@@ -196,7 +230,7 @@ impl AggregateUDFImpl for SedonaAggregateUDF {
     }
 }
 
-pub trait SedonaAccumulator: Debug {
+pub trait SedonaAccumulator: Debug + Send + Sync {
     /// Given input data types, calculate an output data type
     fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>>;
 
@@ -279,7 +313,12 @@ mod test {
     #[test]
     fn udaf_empty() -> Result<()> {
         // UDF with no implementations
-        let udf = SedonaAggregateUDF::new("empty", vec![], 
Volatility::Immutable, None);
+        let udf = SedonaAggregateUDF::new(
+            "empty",
+            Vec::<SedonaAccumulatorRef>::new(),
+            Volatility::Immutable,
+            None,
+        );
         assert_eq!(udf.name(), "empty");
         let err = udf.return_field(&[]).unwrap_err();
         assert_eq!(err.message(), "empty([]): No kernel matching arguments");
diff --git a/rust/sedona-expr/src/function_set.rs 
b/rust/sedona-expr/src/function_set.rs
index 120efd31..33073c98 100644
--- a/rust/sedona-expr/src/function_set.rs
+++ b/rust/sedona-expr/src/function_set.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 use crate::{
-    aggregate_udf::{SedonaAccumulatorRef, SedonaAggregateUDF},
+    aggregate_udf::{IntoSedonaAccumulatorRefs, SedonaAggregateUDF},
     scalar_udf::{IntoScalarKernelRefs, SedonaScalarUDF},
 };
 use datafusion_common::error::Result;
@@ -121,7 +121,7 @@ impl FunctionSet {
     pub fn add_aggregate_udf_kernel(
         &mut self,
         name: &str,
-        kernel: SedonaAccumulatorRef,
+        kernel: impl IntoSedonaAccumulatorRefs,
     ) -> Result<&SedonaAggregateUDF> {
         if let Some(function) = self.aggregate_udf_mut(name) {
             function.add_kernel(kernel);
@@ -148,7 +148,10 @@ mod tests {
     use datafusion_expr::{Accumulator, ColumnarValue, Volatility};
     use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
 
-    use crate::{aggregate_udf::SedonaAccumulator, 
scalar_udf::SimpleSedonaScalarKernel};
+    use crate::{
+        aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
+        scalar_udf::SimpleSedonaScalarKernel,
+    };
 
     use super::*;
 
@@ -217,7 +220,7 @@ mod tests {
         );
     }
 
-    #[derive(Debug)]
+    #[derive(Debug, Clone)]
     struct TestAccumulator {}
 
     impl SedonaAccumulator for TestAccumulator {
@@ -245,8 +248,13 @@ mod tests {
         assert!(functions.aggregate_udf("simple_udaf").is_none());
         assert!(functions.aggregate_udf_mut("simple_udaf").is_none());
 
-        let udaf = SedonaAggregateUDF::new("simple_udaf", vec![], 
Volatility::Immutable, None);
-        let kernel = Arc::new(TestAccumulator {});
+        let udaf = SedonaAggregateUDF::new(
+            "simple_udaf",
+            Vec::<SedonaAccumulatorRef>::new(),
+            Volatility::Immutable,
+            None,
+        );
+        let kernel = TestAccumulator {};
 
         functions.insert_aggregate_udf(udaf);
         assert_eq!(functions.aggregate_udfs().collect::<Vec<_>>().len(), 1);
@@ -268,7 +276,7 @@ mod tests {
 
         let udaf2 = SedonaAggregateUDF::new(
             "simple_udaf2",
-            vec![kernel.clone()],
+            vec![Arc::new(kernel.clone())],
             Volatility::Immutable,
             None,
         );
diff --git a/rust/sedona-expr/src/item_crs.rs b/rust/sedona-expr/src/item_crs.rs
index b51571b0..0889622b 100644
--- a/rust/sedona-expr/src/item_crs.rs
+++ b/rust/sedona-expr/src/item_crs.rs
@@ -17,17 +17,18 @@
 
 use std::{fmt::Debug, iter::zip, sync::Arc};
 
-use arrow_array::{ArrayRef, StructArray};
+use arrow_array::{Array, ArrayRef, StructArray};
 use arrow_buffer::NullBuffer;
-use arrow_schema::{DataType, Field};
+use arrow_schema::{DataType, Field, FieldRef};
 use datafusion_common::{
     cast::{as_string_view_array, as_struct_array},
-    DataFusionError, Result, ScalarValue,
+    exec_err, DataFusionError, Result, ScalarValue,
 };
-use datafusion_expr::ColumnarValue;
+use datafusion_expr::{Accumulator, ColumnarValue};
 use sedona_common::sedona_internal_err;
 use sedona_schema::{crs::deserialize_crs, datatypes::SedonaType, 
matchers::ArgMatcher};
 
+use crate::aggregate_udf::{IntoSedonaAccumulatorRefs, SedonaAccumulator, 
SedonaAccumulatorRef};
 use crate::scalar_udf::{IntoScalarKernelRefs, ScalarKernelRef, 
SedonaScalarKernel};
 
 /// Wrap a [SedonaScalarKernel] to provide Item CRS type support
@@ -114,6 +115,267 @@ impl SedonaScalarKernel for ItemCrsKernel {
     }
 }
 
+/// Wrap a [SedonaAccumulator] to provide Item CRS type support
+///
+/// Most accumulators that operate on geometry or geography in some way
+/// can also support Item CRS inputs:
+///
+/// - Accumulators that return a non-spatial type whose value does not
+///   depend on the input CRS only need to operate on the `item` portion
+///   of any item_crs input (e.g., ST_Analyze_Agg()).
+/// - Accumulators that return a geometry or geography must also return
+///   an item_crs type where the output CRSes are propagated from the
+///   input.
+/// - CRSes within a single group must be compatible
+///
+/// This accumulator provides an automatic wrapper enforcing these rules.
+#[derive(Debug)]
+pub struct ItemCrsSedonaAccumulator {
+    inner: SedonaAccumulatorRef,
+}
+
+impl ItemCrsSedonaAccumulator {
+    /// Create a new [SedonaAccumulatorRef] wrapping the input
+    ///
+    /// The resulting accumulator matches arguments of the input with ItemCrs 
inputs
+    /// but not those of the original accumulator (i.e., an aggregate function 
needs both
+    /// accumulators to support both type-level and item-level CRSes).
+    pub fn new_ref(inner: SedonaAccumulatorRef) -> SedonaAccumulatorRef {
+        Arc::new(Self { inner })
+    }
+
+    /// Wrap a vector of accumulators by appending all ItemCrs versions 
followed by
+    /// the contents of inner
+    ///
+    /// This is the recommended way to add accumulators when all of them 
should support
+    /// ItemCrs inputs.
+    pub fn wrap_impl(inner: impl IntoSedonaAccumulatorRefs) -> 
Vec<SedonaAccumulatorRef> {
+        let accumulators = inner.into_sedona_accumulator_refs();
+
+        let mut out = Vec::with_capacity(accumulators.len() * 2);
+
+        // Add ItemCrsAccumulators first (so they will be resolved last)
+        for inner_accumulator in &accumulators {
+            
out.push(ItemCrsSedonaAccumulator::new_ref(inner_accumulator.clone()));
+        }
+
+        for inner_accumulator in accumulators {
+            out.push(inner_accumulator);
+        }
+
+        out
+    }
+}
+
+impl SedonaAccumulator for ItemCrsSedonaAccumulator {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        // We don't have any functions we can test this with yet, so for the 
moment only support
+        // single-argument aggregations (slightly simpler).
+        if args.len() != 1 {
+            return Ok(None);
+        }
+
+        // This implementation doesn't apply to non-item crs types
+        if !ArgMatcher::is_item_crs().match_type(&args[0]) {
+            return Ok(None);
+        }
+
+        // Strip any CRS that might be present from the input type
+        let item_arg_types = args
+            .iter()
+            .map(|arg_type| {
+                parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, 
_)| item_type)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Resolve the inner accumulator's return type.
+        if let Some(item_type) = self.inner.return_type(&item_arg_types)? {
+            let geo_matcher = ArgMatcher::is_geometry_or_geography();
+
+            // If the inner output is item_crs, the output must also be 
item_crs. Otherwise
+            // the output is left as is.
+            if geo_matcher.match_type(&item_type) {
+                Ok(Some(SedonaType::new_item_crs(&item_type)?))
+            } else {
+                Ok(Some(item_type))
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    fn accumulator(
+        &self,
+        args: &[SedonaType],
+        output_type: &SedonaType,
+    ) -> Result<Box<dyn datafusion_expr::Accumulator>> {
+        // Strip any CRS that might be present from the input type
+        let item_arg_types = args
+            .iter()
+            .map(|arg_type| {
+                parse_item_crs_arg_type_strip_crs(arg_type).map(|(item_type, 
_)| item_type)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Extract the item output type from the item_crs output type
+        let (item_output_type, _) = parse_item_crs_arg_type(output_type)?;
+
+        // Create the inner accumulator
+        let inner = self.inner.accumulator(&item_arg_types, 
&item_output_type)?;
+
+        Ok(Box::new(ItemCrsAccumulator {
+            inner,
+            item_output_type,
+            crs: None,
+        }))
+    }
+
+    fn state_fields(&self, args: &[SedonaType]) -> Result<Vec<FieldRef>> {
+        // We need an extra state field to track the CRS of each group
+        let mut fields = self.inner.state_fields(args)?;
+        fields.push(Field::new("group_crs", DataType::Utf8View, true).into());
+        Ok(fields)
+    }
+}
+
+#[derive(Debug)]
+struct ItemCrsAccumulator {
+    /// The wrapped inner accumulator
+    inner: Box<dyn Accumulator>,
+    /// The item output type (without the item_crs wrapper)
+    item_output_type: SedonaType,
+    /// If any rows have been encountered, the CRS (the literal string "0" is 
used
+    /// as a sentinel for "no CRS" because we have to serialize it (and None is
+    /// reserved for "we haven't seen any rows yet"))
+    crs: Option<String>,
+}
+
+impl Accumulator for ItemCrsAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        // The input is an item_crs struct array; extract the item and crs 
columns
+        let struct_array = as_struct_array(&values[0])?;
+        let item_array = struct_array.column(0).clone();
+        let crs_array = as_string_view_array(struct_array.column(1))?;
+
+        // Check and track CRS values
+        if let Some(struct_nulls) = struct_array.nulls() {
+            // Skip CRS values for null items
+            for (is_valid, crs_value) in zip(struct_nulls, crs_array.iter()) {
+                if is_valid {
+                    self.merge_crs(crs_value.unwrap_or("0"))?;
+                }
+            }
+        } else {
+            // No nulls
+            for crs_value in crs_array.iter() {
+                self.merge_crs(crs_value.unwrap_or("0"))?;
+            }
+        }
+
+        // Update the inner accumulator with just the item portion
+        self.inner.update_batch(&[item_array])?;
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        let inner_result = self.inner.evaluate()?;
+
+        // If the output type is not geometry or geography we can just return 
it
+        if !matches!(
+            self.item_output_type,
+            SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _)
+        ) {
+            return Ok(inner_result);
+        }
+
+        // Otherwise, prepare the item_crs result
+
+        // Convert the sentinel back to None
+        let crs_value = match &self.crs {
+            Some(s) if s == "0" => None,
+            Some(s) => Some(s.clone()),
+            None => None,
+        };
+
+        // Create the item_crs struct scalar
+        let item_crs_result = make_item_crs(
+            &self.item_output_type,
+            ColumnarValue::Scalar(inner_result),
+            &ColumnarValue::Scalar(ScalarValue::Utf8View(crs_value)),
+            None,
+        )?;
+
+        match item_crs_result {
+            ColumnarValue::Scalar(scalar) => Ok(scalar),
+            ColumnarValue::Array(_) => {
+                sedona_internal_err!("Expected scalar result from 
make_item_crs")
+            }
+        }
+    }
+
+    fn size(&self) -> usize {
+        self.inner.size() + size_of::<ItemCrsAccumulator>()
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        let mut inner_state = self.inner.state()?;
+        inner_state.push(ScalarValue::Utf8View(self.crs.clone()));
+        Ok(inner_state)
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        // The CRS field is the last element of states
+        if states.is_empty() {
+            return sedona_internal_err!("Expected at least one state field");
+        }
+        let crs_array = as_string_view_array(states.last().unwrap())?;
+
+        // Check and merge CRS values from the state
+        for crs_str in crs_array.iter().flatten() {
+            self.merge_crs(crs_str)?;
+        }
+
+        // Merge the inner state (excluding the CRS field)
+        let inner_states = &states[..states.len() - 1];
+        self.inner.merge_batch(inner_states)
+    }
+}
+
+impl ItemCrsAccumulator {
+    /// Merge a CRS value into the accumulator's tracked CRS
+    ///
+    /// Ensures all CRS values are compatible. Here "0" means an explicit
+    /// null crs. This is because we have to serialize it somehow and None
+    /// is reserved for the "we haven't seen a CRS yet".
+    fn merge_crs(&mut self, crs_str: &str) -> Result<()> {
+        match &self.crs {
+            None => {
+                // First CRS value encountered
+                self.crs = Some(crs_str.to_string());
+                Ok(())
+            }
+            Some(existing) if existing == crs_str => {
+                // CRS is byte-for-byte equal, nothing to do
+                Ok(())
+            }
+            Some(existing) => {
+                // Check if CRSes are semantically equal
+                let existing_crs = deserialize_crs(existing)?;
+                let new_crs = deserialize_crs(crs_str)?;
+                if existing_crs == new_crs {
+                    Ok(())
+                } else {
+                    let existing_displ = existing_crs
+                        .map(|c| c.to_string())
+                        .unwrap_or("None".to_string());
+                    let new_displ = new_crs.map(|c| 
c.to_string()).unwrap_or("None".to_string());
+                    exec_err!("CRS values not equal: {existing_displ} vs 
{new_displ}")
+                }
+            }
+        }
+    }
+}
+
 /// Calculate a return type based on the underlying kernel
 ///
 /// This function extracts the item portion of any item_crs input and
diff --git a/rust/sedona-functions/src/st_analyze_agg.rs 
b/rust/sedona-functions/src/st_analyze_agg.rs
index a0f95b63..9dd49b26 100644
--- a/rust/sedona-functions/src/st_analyze_agg.rs
+++ b/rust/sedona-functions/src/st_analyze_agg.rs
@@ -32,6 +32,7 @@ use datafusion_expr::{scalar_doc_sections::DOC_SECTION_OTHER, 
Documentation, Vol
 use datafusion_expr::{Accumulator, ColumnarValue};
 use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
 use sedona_expr::aggregate_udf::SedonaAggregateUDF;
+use sedona_expr::item_crs::ItemCrsSedonaAccumulator;
 use sedona_expr::{aggregate_udf::SedonaAccumulator, statistics::GeoStatistics};
 use sedona_geometry::analyze::GeometryAnalysis;
 use sedona_geometry::interval::IntervalTrait;
@@ -49,7 +50,7 @@ use crate::executor::WkbExecutor;
 pub fn st_analyze_agg_udf() -> SedonaAggregateUDF {
     SedonaAggregateUDF::new(
         "st_analyze_agg",
-        vec![Arc::new(STAnalyzeAgg {})],
+        ItemCrsSedonaAccumulator::wrap_impl(STAnalyzeAgg {}),
         Volatility::Immutable,
         Some(st_analyze_agg_doc()),
     )
@@ -469,7 +470,7 @@ mod test {
     use arrow_json::ArrayWriter;
     use arrow_schema::Schema;
     use rstest::rstest;
-    use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+    use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS, 
WKB_VIEW_GEOMETRY};
     use sedona_testing::testers::AggregateUdfTester;
     use serde_json::Value;
 
@@ -537,7 +538,10 @@ mod test {
     }
 
     #[rstest]
-    fn analyze_linestring(#[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY)] 
sedona_type: SedonaType) {
+    fn analyze_linestring(
+        #[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY, 
WKB_GEOMETRY_ITEM_CRS.clone())]
+        sedona_type: SedonaType,
+    ) {
         let mut udaf = st_analyze_agg_udf();
         udaf.add_kernel(st_analyze_agg_impl());
 
diff --git a/rust/sedona-functions/src/st_collect_agg.rs 
b/rust/sedona-functions/src/st_collect_agg.rs
index 58be15fc..3a801d3e 100644
--- a/rust/sedona-functions/src/st_collect_agg.rs
+++ b/rust/sedona-functions/src/st_collect_agg.rs
@@ -29,7 +29,10 @@ use datafusion_expr::{
 };
 use geo_traits::Dimensions;
 use sedona_common::sedona_internal_err;
-use sedona_expr::aggregate_udf::{SedonaAccumulator, SedonaAggregateUDF};
+use sedona_expr::{
+    aggregate_udf::{SedonaAccumulator, SedonaAggregateUDF},
+    item_crs::ItemCrsSedonaAccumulator,
+};
 use sedona_geometry::{
     types::{GeometryTypeAndDimensions, GeometryTypeId},
     wkb_factory::{
@@ -48,12 +51,12 @@ use sedona_schema::{
 pub fn st_collect_agg_udf() -> SedonaAggregateUDF {
     SedonaAggregateUDF::new(
         "st_collect_agg",
-        vec![
+        ItemCrsSedonaAccumulator::wrap_impl(vec![
             Arc::new(STCollectAggr {
                 is_geography: false,
             }),
             Arc::new(STCollectAggr { is_geography: true }),
-        ],
+        ]),
         Volatility::Immutable,
         Some(st_collect_agg_doc()),
     )
@@ -309,8 +312,14 @@ impl Accumulator for CollectionAccumulator {
 mod test {
     use datafusion_expr::AggregateUDF;
     use rstest::rstest;
-    use sedona_schema::datatypes::{WKB_VIEW_GEOGRAPHY, WKB_VIEW_GEOMETRY};
-    use sedona_testing::{compare::assert_scalar_equal_wkb_geometry, 
testers::AggregateUdfTester};
+    use sedona_schema::datatypes::{
+        WKB_GEOGRAPHY_ITEM_CRS, WKB_GEOMETRY_ITEM_CRS, WKB_VIEW_GEOGRAPHY, 
WKB_VIEW_GEOMETRY,
+    };
+    use sedona_testing::{
+        compare::{assert_scalar_equal, assert_scalar_equal_wkb_geometry},
+        create::{create_array, create_array_item_crs, create_scalar, 
create_scalar_item_crs},
+        testers::AggregateUdfTester,
+    };
 
     use super::*;
 
@@ -388,4 +397,147 @@ mod test {
             AggregateUdfTester::new(st_collect_agg_udf().into(), 
vec![sedona_type.clone()]);
         assert_eq!(tester.return_type().unwrap(), WKB_GEOGRAPHY);
     }
+
+    #[rstest]
+    fn udf_invoke_item_crs(
+        #[values(WKB_GEOMETRY_ITEM_CRS.clone(), 
WKB_GEOGRAPHY_ITEM_CRS.clone())]
+        sedona_type: SedonaType,
+    ) {
+        let tester =
+            AggregateUdfTester::new(st_collect_agg_udf().into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batch0 = create_array(
+            &[Some("POINT (0 1)"), None, Some("POINT (2 3)")],
+            &sedona_type,
+        );
+        let batch1 = create_array(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            &sedona_type,
+        );
+
+        let batches = vec![batch0, batch1];
+        let expected = create_scalar(Some("MULTIPOINT (0 1, 2 3, 4 5, 6 7)"), 
&sedona_type);
+
+        assert_scalar_equal(&tester.aggregate(&batches).unwrap(), &expected);
+    }
+
+    #[rstest]
+    fn udf_invoke_item_crs_idential_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let tester =
+            AggregateUdfTester::new(st_collect_agg_udf().into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batch0 = create_array_item_crs(
+            &[Some("POINT (0 1)"), None, Some("POINT (2 3)")],
+            [Some("EPSG:4326"), None, Some("EPSG:4326")],
+            &WKB_GEOMETRY,
+        );
+        let batch1 = create_array_item_crs(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            [Some("EPSG:4326"), None, Some("EPSG:4326")],
+            &WKB_GEOMETRY,
+        );
+
+        let expected = create_scalar_item_crs(
+            Some("MULTIPOINT (0 1, 2 3, 4 5, 6 7)"),
+            Some("EPSG:4326"),
+            &WKB_GEOMETRY,
+        );
+
+        assert_scalar_equal(
+            &tester
+                .aggregate(&vec![batch0.clone(), batch1.clone()])
+                .unwrap(),
+            &expected,
+        );
+    }
+
+    #[rstest]
+    fn udf_invoke_item_crs_multiple_compatible_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let tester =
+            AggregateUdfTester::new(st_collect_agg_udf().into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batch0 = create_array_item_crs(
+            &[Some("POINT (0 1)"), None, Some("POINT (2 3)")],
+            [Some("OGC:CRS84"), None, Some("EPSG:4326")],
+            &WKB_GEOMETRY,
+        );
+        let batch1 = create_array_item_crs(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            [Some("EPSG:4326"), None, Some("OGC:CRS84")],
+            &WKB_GEOMETRY,
+        );
+
+        let expected = create_scalar_item_crs(
+            Some("MULTIPOINT (0 1, 2 3, 4 5, 6 7)"),
+            Some("OGC:CRS84"),
+            &WKB_GEOMETRY,
+        );
+
+        assert_scalar_equal(
+            &tester
+                .aggregate(&vec![batch0.clone(), batch1.clone()])
+                .unwrap(),
+            &expected,
+        );
+    }
+
+    #[rstest]
+    fn udf_invoke_item_crs_incompatible_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let tester =
+            AggregateUdfTester::new(st_collect_agg_udf().into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batch0 = create_array_item_crs(
+            &[Some("POINT (0 1)"), None, Some("POINT (2 3)")],
+            [Some("OGC:CRS84"), None, Some("EPSG:4326")],
+            &WKB_GEOMETRY,
+        );
+
+        // We should error if we see incompatible CRSes between batches
+        let batch1_other_crs = create_array_item_crs(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            [Some("EPSG:3857"), None, Some("EPSG:3857")],
+            &WKB_GEOMETRY,
+        );
+        let err = tester
+            .aggregate(&vec![batch0.clone(), batch1_other_crs.clone()])
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: ogc:crs84 vs epsg:3857"
+        );
+
+        // We should error if we see incompatible CRSes between batches (None
+        // should be considered an incompatible CRS)
+        let batch1_other_crs = create_array_item_crs(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            [None, None, None],
+            &WKB_GEOMETRY,
+        );
+        let err = tester
+            .aggregate(&vec![batch0.clone(), batch1_other_crs.clone()])
+            .unwrap_err();
+        assert_eq!(err.message(), "CRS values not equal: ogc:crs84 vs None");
+
+        // Or if we see incompatible CRSes in a single batch
+        let batch0_incompatible_crses = create_array_item_crs(
+            &[Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+            [Some("OGC:CRS84"), None, Some("EPSG:3857")],
+            &WKB_GEOMETRY,
+        );
+
+        let err = tester
+            .aggregate(&vec![batch0_incompatible_crses.clone()])
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "CRS values not equal: ogc:crs84 vs epsg:3857"
+        );
+    }
 }
diff --git a/rust/sedona-functions/src/st_envelope_agg.rs 
b/rust/sedona-functions/src/st_envelope_agg.rs
index a5692001..37ca9945 100644
--- a/rust/sedona-functions/src/st_envelope_agg.rs
+++ b/rust/sedona-functions/src/st_envelope_agg.rs
@@ -29,7 +29,10 @@ use datafusion_expr::{
     GroupsAccumulator, Volatility,
 };
 use sedona_common::sedona_internal_err;
-use sedona_expr::aggregate_udf::{SedonaAccumulator, SedonaAggregateUDF};
+use sedona_expr::{
+    aggregate_udf::{SedonaAccumulator, SedonaAggregateUDF},
+    item_crs::ItemCrsSedonaAccumulator,
+};
 use sedona_geometry::{
     bounds::geo_traits_update_xy_bounds,
     interval::{Interval, IntervalTrait},
@@ -46,7 +49,7 @@ use sedona_schema::{
 pub fn st_envelope_agg_udf() -> SedonaAggregateUDF {
     SedonaAggregateUDF::new(
         "st_envelope_agg",
-        vec![Arc::new(STEnvelopeAgg {})],
+        ItemCrsSedonaAccumulator::wrap_impl(vec![Arc::new(STEnvelopeAgg {})]),
         Volatility::Immutable,
         Some(st_envelope_agg_doc()),
     )
@@ -347,10 +350,10 @@ impl GroupsAccumulator for BoundsGroupsAccumulator2D {
 mod test {
     use datafusion_expr::AggregateUDF;
     use rstest::rstest;
-    use sedona_schema::datatypes::WKB_VIEW_GEOMETRY;
+    use sedona_schema::datatypes::{WKB_GEOMETRY_ITEM_CRS, WKB_VIEW_GEOMETRY};
     use sedona_testing::{
-        compare::{assert_array_equal, assert_scalar_equal_wkb_geometry},
-        create::create_array,
+        compare::{assert_array_equal, assert_scalar_equal, 
assert_scalar_equal_wkb_geometry},
+        create::{create_array, create_scalar},
         testers::AggregateUdfTester,
     };
 
@@ -415,6 +418,22 @@ mod test {
         );
     }
 
+    #[test]
+    fn udf_invoke_item_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let tester =
+            AggregateUdfTester::new(st_envelope_agg_udf().into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batches = vec![
+            vec![Some("POINT (0 1)"), None, Some("POINT (2 3)")],
+            vec![Some("POINT (4 5)"), None, Some("POINT (6 7)")],
+        ];
+        let expected = create_scalar(Some("POLYGON((0 1, 0 7, 6 7, 6 1, 0 
1))"), &sedona_type);
+
+        assert_scalar_equal(&tester.aggregate_wkt(batches).unwrap(), 
&expected);
+    }
+
     #[test]
     fn udf_grouped_accumulate() {
         let tester = AggregateUdfTester::new(st_envelope_agg_udf().into(), 
vec![WKB_GEOMETRY]);
diff --git a/rust/sedona-geo/Cargo.toml b/rust/sedona-geo/Cargo.toml
index bd20a30b..044e4823 100644
--- a/rust/sedona-geo/Cargo.toml
+++ b/rust/sedona-geo/Cargo.toml
@@ -49,6 +49,7 @@ geo-types = { workspace = true }
 geo = { workspace = true }
 geojson = { workspace = true }
 serde_json = { workspace = true }
+sedona-common = { workspace = true }
 sedona-expr = { workspace = true }
 sedona-functions = { workspace = true }
 sedona-geometry = { workspace = true }
diff --git a/rust/sedona-geo/src/register.rs b/rust/sedona-geo/src/register.rs
index 763e972a..6c9b5d57 100644
--- a/rust/sedona-geo/src/register.rs
+++ b/rust/sedona-geo/src/register.rs
@@ -52,7 +52,7 @@ pub fn scalar_kernels() -> Vec<(&'static str, 
Vec<ScalarKernelRef>)> {
     )
 }
 
-pub fn aggregate_kernels() -> Vec<(&'static str, SedonaAccumulatorRef)> {
+pub fn aggregate_kernels() -> Vec<(&'static str, Vec<SedonaAccumulatorRef>)> {
     define_aggregate_kernels!(
         "st_intersection_agg" => 
crate::st_intersection_agg::st_intersection_agg_impl,
         "st_union_agg" => crate::st_union_agg::st_union_agg_impl,
diff --git a/rust/sedona-geo/src/st_intersection_agg.rs 
b/rust/sedona-geo/src/st_intersection_agg.rs
index 72fafb91..49d4b448 100644
--- a/rust/sedona-geo/src/st_intersection_agg.rs
+++ b/rust/sedona-geo/src/st_intersection_agg.rs
@@ -25,7 +25,10 @@ use datafusion_common::{
 use datafusion_expr::{Accumulator, ColumnarValue};
 use geo::{BooleanOps, Intersects};
 use geo_traits::to_geo::ToGeoGeometry;
-use sedona_expr::aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef};
+use sedona_expr::{
+    aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
+    item_crs::ItemCrsSedonaAccumulator,
+};
 use sedona_functions::executor::WkbExecutor;
 use sedona_schema::{
     datatypes::{SedonaType, WKB_GEOMETRY},
@@ -36,8 +39,8 @@ use wkb::Endianness;
 use wkb::{reader::Wkb, writer::WriteOptions};
 
 /// ST_Intersection_Agg() implementation
-pub fn st_intersection_agg_impl() -> SedonaAccumulatorRef {
-    Arc::new(STIntersectionAgg {})
+pub fn st_intersection_agg_impl() -> Vec<SedonaAccumulatorRef> {
+    ItemCrsSedonaAccumulator::wrap_impl(STIntersectionAgg {})
 }
 
 #[derive(Debug)]
@@ -228,9 +231,11 @@ mod test {
     use super::*;
     use rstest::rstest;
     use sedona_functions::st_intersection_agg::st_intersection_agg_udf;
-    use sedona_schema::datatypes::WKB_VIEW_GEOMETRY;
+    use sedona_schema::datatypes::{WKB_GEOMETRY_ITEM_CRS, WKB_VIEW_GEOMETRY};
     use sedona_testing::{
-        compare::assert_scalar_equal_wkb_geometry_topologically, 
testers::AggregateUdfTester,
+        compare::{assert_scalar_equal, 
assert_scalar_equal_wkb_geometry_topologically},
+        create::create_scalar_item_crs,
+        testers::AggregateUdfTester,
     };
 
     #[rstest]
@@ -398,4 +403,26 @@ mod test {
             Some("MULTIPOLYGON(((3 3,4 3,4 4,3 4,3 3)),((13 13,14 13,14 14,13 
14,13 13)))"),
         );
     }
+
+    #[rstest]
+    fn udf_invoke_item_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+
+        let mut udaf = st_intersection_agg_udf();
+        udaf.add_kernel(st_intersection_agg_impl());
+        let tester = AggregateUdfTester::new(udaf.into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batches = vec![
+            vec![Some("POLYGON((0 0, 2 0, 2 2, 0 2, 0 0))")],
+            vec![Some("POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))")],
+        ];
+        let expected = create_scalar_item_crs(
+            Some("MULTIPOLYGON(((1 2, 1 1, 2 1, 2 2, 1 2)))"),
+            None,
+            &WKB_GEOMETRY,
+        );
+
+        assert_scalar_equal(&tester.aggregate_wkt(batches).unwrap(), 
&expected);
+    }
 }
diff --git a/rust/sedona-geo/src/st_union_agg.rs 
b/rust/sedona-geo/src/st_union_agg.rs
index 5b780bd0..cb2a8be3 100644
--- a/rust/sedona-geo/src/st_union_agg.rs
+++ b/rust/sedona-geo/src/st_union_agg.rs
@@ -25,7 +25,11 @@ use datafusion_common::{
 use datafusion_expr::{Accumulator, ColumnarValue};
 use geo::BooleanOps;
 use geo_traits::to_geo::ToGeoGeometry;
-use sedona_expr::aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef};
+use sedona_common::sedona_internal_err;
+use sedona_expr::{
+    aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
+    item_crs::ItemCrsSedonaAccumulator,
+};
 use sedona_functions::executor::WkbExecutor;
 use sedona_schema::{
     datatypes::{SedonaType, WKB_GEOMETRY},
@@ -36,8 +40,8 @@ use wkb::Endianness;
 use wkb::{reader::Wkb, writer::WriteOptions};
 
 /// ST_Union_Agg() implementation
-pub fn st_union_agg_impl() -> SedonaAccumulatorRef {
-    Arc::new(STUnionAgg {})
+pub fn st_union_agg_impl() -> Vec<SedonaAccumulatorRef> {
+    ItemCrsSedonaAccumulator::wrap_impl(STUnionAgg {})
 }
 
 #[derive(Debug)]
@@ -161,9 +165,7 @@ impl UnionAccumulator {
 impl Accumulator for UnionAccumulator {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         if values.is_empty() {
-            return Err(DataFusionError::Internal(
-                "No input arrays provided to accumulator in 
update_batch".to_string(),
-            ));
+            return sedona_internal_err!("No input arrays provided to 
accumulator in update_batch");
         }
         let arg_types = [self.input_type.clone()];
         let args = [ColumnarValue::Array(values[0].clone())];
@@ -222,9 +224,11 @@ mod test {
     use super::*;
     use rstest::rstest;
     use sedona_functions::st_union_agg::st_union_agg_udf;
-    use sedona_schema::datatypes::WKB_VIEW_GEOMETRY;
+    use sedona_schema::datatypes::{WKB_GEOMETRY_ITEM_CRS, WKB_VIEW_GEOMETRY};
     use sedona_testing::{
-        compare::assert_scalar_equal_wkb_geometry_topologically, 
testers::AggregateUdfTester,
+        compare::{assert_scalar_equal, 
assert_scalar_equal_wkb_geometry_topologically},
+        create::create_scalar_item_crs,
+        testers::AggregateUdfTester,
     };
 
     #[rstest]
@@ -384,4 +388,25 @@ mod test {
             Some("MULTIPOLYGON(((0 0, 4 0, 4 3, 7 3, 7 6, 10 6, 10 10, 6 10, 6 
7, 3 7, 3 4, 0 4, 0 0)),((10 10, 14 10, 14 13, 17 13, 17 16, 20 16, 20 20, 16 
20, 16 17, 13 17, 13 14, 10 14, 10 10)))"),
         );
     }
+
+    #[rstest]
+    fn udf_invoke_item_crs() {
+        let sedona_type = WKB_GEOMETRY_ITEM_CRS.clone();
+        let mut udaf = st_union_agg_udf();
+        udaf.add_kernel(st_union_agg_impl());
+        let tester = AggregateUdfTester::new(udaf.into(), 
vec![sedona_type.clone()]);
+        assert_eq!(tester.return_type().unwrap(), sedona_type.clone());
+
+        let batches = vec![
+            vec![Some("POLYGON((0 0, 2 0, 2 2, 0 2, 0 0))")],
+            vec![Some("POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))")],
+        ];
+        let expected = create_scalar_item_crs(
+            Some("MULTIPOLYGON(((0 2, 0 0, 2 0, 2 1, 3 1, 3 3, 1 3, 1 2, 0 
2)))"),
+            None,
+            &WKB_GEOMETRY,
+        );
+
+        assert_scalar_equal(&tester.aggregate_wkt(batches).unwrap(), 
&expected);
+    }
 }
diff --git a/rust/sedona-schema/src/crs.rs b/rust/sedona-schema/src/crs.rs
index 2a77ad60..a4649b15 100644
--- a/rust/sedona-schema/src/crs.rs
+++ b/rust/sedona-schema/src/crs.rs
@@ -50,9 +50,11 @@ pub fn deserialize_crs(crs_str: &str) -> Result<Crs> {
         return Ok(cached);
     }
 
-    // Handle JSON strings "OGC:CRS84" and "EPSG:4326"
+    // Handle JSON strings "OGC:CRS84", "EPSG:4326", "{AUTH}:{CODE}" and "0"
     let crs = if LngLat::is_str_lnglat(crs_str) {
         lnglat()
+    } else if crs_str == "0" {
+        None
     } else if AuthorityCode::is_authority_code(crs_str) {
         AuthorityCode::crs(crs_str)
     } else {
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 4f4fb661..6efd3ba4 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -45,8 +45,8 @@ use parking_lot::Mutex;
 use sedona_common::option::add_sedona_option_extension;
 use sedona_datasource::provider::external_listing_table;
 use sedona_datasource::spec::ExternalFormatSpec;
-use sedona_expr::function_set::FunctionSet;
-use sedona_expr::{aggregate_udf::SedonaAccumulatorRef, 
scalar_udf::IntoScalarKernelRefs};
+use sedona_expr::scalar_udf::IntoScalarKernelRefs;
+use sedona_expr::{aggregate_udf::IntoSedonaAccumulatorRefs, 
function_set::FunctionSet};
 use sedona_geoparquet::options::TableGeoParquetOptions;
 use sedona_geoparquet::{
     format::GeoParquetFormatFactory,
@@ -206,7 +206,7 @@ impl SedonaContext {
 
     pub fn register_aggregate_kernels<'a>(
         &mut self,
-        kernels: impl Iterator<Item = (&'a str, SedonaAccumulatorRef)>,
+        kernels: impl Iterator<Item = (&'a str, impl 
IntoSedonaAccumulatorRefs)>,
     ) -> Result<()> {
         for (name, kernel) in kernels {
             let udf = self.functions.add_aggregate_udf_kernel(name, kernel)?;


Reply via email to