This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 717b92e  fix(rust/sedona-functions): Eliminate some intermediary 
copies in ST_Collect() (#95)
717b92e is described below

commit 717b92e6653318654e9c9dfcbcf4cf588dd4b226
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Sep 16 13:55:08 2025 -0500

    fix(rust/sedona-functions): Eliminate some intermediary copies in 
ST_Collect() (#95)
---
 rust/sedona-functions/src/st_collect.rs | 58 ++++++++++++++++++++++++---------
 1 file changed, 42 insertions(+), 16 deletions(-)

diff --git a/rust/sedona-functions/src/st_collect.rs 
b/rust/sedona-functions/src/st_collect.rs
index 7266a62..7eed4b3 100644
--- a/rust/sedona-functions/src/st_collect.rs
+++ b/rust/sedona-functions/src/st_collect.rs
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use std::{io::Write, sync::Arc, vec};
+use std::{sync::Arc, vec};
 
 use crate::executor::WkbExecutor;
 use arrow_array::ArrayRef;
@@ -101,17 +101,26 @@ struct CollectionAccumulator {
     unique_geometry_types: HashSet<GeometryTypeId>,
     unique_dimensions: HashSet<Dimensions>,
     count: i64,
-    item: Vec<u8>,
+    item: Option<Vec<u8>>,
 }
 
+const WKB_HEADER_SIZE: usize = 1 + 4 + 4;
+
 impl CollectionAccumulator {
     pub fn try_new(input_type: SedonaType, _output_type: SedonaType) -> 
Result<Self> {
+        // Write a dummy header with the correct number of bytes. We'll 
rewrite this later
+        // when we know what type/dimension of geometrycollection we have 
based on the
+        // items encountered.
+        let mut item = Vec::new();
+        write_wkb_geometrycollection_header(&mut item, Dimensions::Xy, 0)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
         Ok(Self {
             input_type,
             unique_geometry_types: HashSet::new(),
             unique_dimensions: HashSet::new(),
             count: 0,
-            item: Vec::new(),
+            item: Some(item),
         })
     }
 
@@ -123,7 +132,7 @@ impl CollectionAccumulator {
 
         // Generate the correct header: collections of points become 
multipoint, ensure
         // dimensions are preserved if possible.
-        let mut new_item = Vec::new();
+        let mut new_header = Vec::new();
         let count_usize = self.count.try_into().unwrap();
 
         if self.unique_dimensions.len() != 1 {
@@ -134,35 +143,45 @@ impl CollectionAccumulator {
         if self.unique_geometry_types.len() == 1 {
             match self.unique_geometry_types.iter().next().unwrap() {
                 GeometryTypeId::Point => {
-                    write_wkb_multipoint_header(&mut new_item, dimensions, 
count_usize)
+                    write_wkb_multipoint_header(&mut new_header, dimensions, 
count_usize)
                         .map_err(|e| DataFusionError::External(Box::new(e)))?;
                 }
                 GeometryTypeId::LineString => {
-                    write_wkb_multilinestring_header(&mut new_item, 
dimensions, count_usize)
+                    write_wkb_multilinestring_header(&mut new_header, 
dimensions, count_usize)
                         .map_err(|e| DataFusionError::External(Box::new(e)))?;
                 }
                 GeometryTypeId::Polygon => {
-                    write_wkb_multipolygon_header(&mut new_item, dimensions, 
count_usize)
+                    write_wkb_multipolygon_header(&mut new_header, dimensions, 
count_usize)
                         .map_err(|e| DataFusionError::External(Box::new(e)))?;
                 }
                 _ => {
-                    write_wkb_geometrycollection_header(&mut new_item, 
dimensions, count_usize)
+                    write_wkb_geometrycollection_header(&mut new_header, 
dimensions, count_usize)
                         .map_err(|e| DataFusionError::External(Box::new(e)))?;
                 }
             }
         } else {
-            write_wkb_geometrycollection_header(&mut new_item, dimensions, 
count_usize)
+            write_wkb_geometrycollection_header(&mut new_header, dimensions, 
count_usize)
                 .map_err(|e| DataFusionError::External(Box::new(e)))?;
         }
 
-        // Write the rest of item into the output and return it
-        new_item.extend(self.item.iter());
-        Ok(Some(new_item))
+        // Update the header bytes of the output and return it
+        if let Some(mut out) = self.item.take() {
+            out[0..WKB_HEADER_SIZE].copy_from_slice(&new_header);
+            Ok(Some(out))
+        } else {
+            sedona_internal_err!("Unexpected internal state in ST_Collect()")
+        }
     }
 }
 
 impl Accumulator for CollectionAccumulator {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let item_ref = if let Some(item_ref) = self.item.as_mut() {
+            item_ref
+        } else {
+            return sedona_internal_err!("Unexpected internal state in 
ST_Collect()");
+        };
+
         let arg_types = [self.input_type.clone()];
         let args = [ColumnarValue::Array(values[0].clone())];
         let executor = WkbExecutor::new(&arg_types, &args);
@@ -174,7 +193,7 @@ impl Accumulator for CollectionAccumulator {
                     .insert(type_and_dims.geometry_type());
                 self.unique_dimensions.insert(type_and_dims.dimensions());
                 self.count += 1;
-                self.item.write_all(item.buf())?;
+                item_ref.extend_from_slice(item.buf());
             }
             Ok(())
         })?;
@@ -202,7 +221,7 @@ impl Accumulator for CollectionAccumulator {
         let serialized_geometry_types = 
ScalarValue::Utf8(Some(geometry_types_value));
         let serialized_dimensions = ScalarValue::Utf8(Some(dimensions_value));
         let serialized_count = ScalarValue::Int64(Some(self.count));
-        let serialized_item = ScalarValue::Binary(Some(self.item.clone()));
+        let serialized_item = ScalarValue::Binary(self.item.take());
 
         Ok(vec![
             serialized_geometry_types,
@@ -213,7 +232,8 @@ impl Accumulator for CollectionAccumulator {
     }
 
     fn size(&self) -> usize {
-        size_of::<CollectionAccumulator>() + self.item.capacity()
+        let item_capacity = self.item.as_ref().map(|e| 
e.capacity()).unwrap_or(0);
+        size_of::<CollectionAccumulator>() + item_capacity
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -224,6 +244,12 @@ impl Accumulator for CollectionAccumulator {
             );
         }
 
+        let item_ref = if let Some(item_ref) = self.item.as_mut() {
+            item_ref
+        } else {
+            return sedona_internal_err!("Unexpected internal state in 
ST_Collect()");
+        };
+
         let mut geometry_types_iter = as_string_array(&states[0])?.into_iter();
         let mut dimensions_iter = as_string_array(&states[1])?.into_iter();
         let mut count_iter = as_int64_array(&states[2])?.into_iter();
@@ -255,7 +281,7 @@ impl Accumulator for CollectionAccumulator {
                     self.unique_geometry_types.extend(geometry_types);
                     self.unique_dimensions.extend(dimensions);
                     self.count += count;
-                    self.item.extend_from_slice(item);
+                    
item_ref.extend_from_slice(&item[WKB_HEADER_SIZE..item.len()]);
                 }
                 _ => {
                     return sedona_internal_err!(

Reply via email to