This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 717b92e fix(rust/sedona-functions): Eliminate some intermediary
copies in ST_Collect() (#95)
717b92e is described below
commit 717b92e6653318654e9c9dfcbcf4cf588dd4b226
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Sep 16 13:55:08 2025 -0500
fix(rust/sedona-functions): Eliminate some intermediary copies in
ST_Collect() (#95)
---
rust/sedona-functions/src/st_collect.rs | 58 ++++++++++++++++++++++++---------
1 file changed, 42 insertions(+), 16 deletions(-)
diff --git a/rust/sedona-functions/src/st_collect.rs
b/rust/sedona-functions/src/st_collect.rs
index 7266a62..7eed4b3 100644
--- a/rust/sedona-functions/src/st_collect.rs
+++ b/rust/sedona-functions/src/st_collect.rs
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use std::{io::Write, sync::Arc, vec};
+use std::{sync::Arc, vec};
use crate::executor::WkbExecutor;
use arrow_array::ArrayRef;
@@ -101,17 +101,26 @@ struct CollectionAccumulator {
unique_geometry_types: HashSet<GeometryTypeId>,
unique_dimensions: HashSet<Dimensions>,
count: i64,
- item: Vec<u8>,
+ item: Option<Vec<u8>>,
}
+const WKB_HEADER_SIZE: usize = 1 + 4 + 4;
+
impl CollectionAccumulator {
pub fn try_new(input_type: SedonaType, _output_type: SedonaType) ->
Result<Self> {
+ // Write a dummy header with the correct number of bytes. We'll
rewrite this later
+ // when we know what type/dimension of geometrycollection we have
based on the
+ // items encountered.
+ let mut item = Vec::new();
+ write_wkb_geometrycollection_header(&mut item, Dimensions::Xy, 0)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
Ok(Self {
input_type,
unique_geometry_types: HashSet::new(),
unique_dimensions: HashSet::new(),
count: 0,
- item: Vec::new(),
+ item: Some(item),
})
}
@@ -123,7 +132,7 @@ impl CollectionAccumulator {
// Generate the correct header: collections of points become
multipoint, ensure
// dimensions are preserved if possible.
- let mut new_item = Vec::new();
+ let mut new_header = Vec::new();
let count_usize = self.count.try_into().unwrap();
if self.unique_dimensions.len() != 1 {
@@ -134,35 +143,45 @@ impl CollectionAccumulator {
if self.unique_geometry_types.len() == 1 {
match self.unique_geometry_types.iter().next().unwrap() {
GeometryTypeId::Point => {
- write_wkb_multipoint_header(&mut new_item, dimensions,
count_usize)
+ write_wkb_multipoint_header(&mut new_header, dimensions,
count_usize)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
GeometryTypeId::LineString => {
- write_wkb_multilinestring_header(&mut new_item,
dimensions, count_usize)
+ write_wkb_multilinestring_header(&mut new_header,
dimensions, count_usize)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
GeometryTypeId::Polygon => {
- write_wkb_multipolygon_header(&mut new_item, dimensions,
count_usize)
+ write_wkb_multipolygon_header(&mut new_header, dimensions,
count_usize)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
_ => {
- write_wkb_geometrycollection_header(&mut new_item,
dimensions, count_usize)
+ write_wkb_geometrycollection_header(&mut new_header,
dimensions, count_usize)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
}
} else {
- write_wkb_geometrycollection_header(&mut new_item, dimensions,
count_usize)
+ write_wkb_geometrycollection_header(&mut new_header, dimensions,
count_usize)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
}
- // Write the rest of item into the output and return it
- new_item.extend(self.item.iter());
- Ok(Some(new_item))
+ // Update the header bytes of the output and return it
+ if let Some(mut out) = self.item.take() {
+ out[0..WKB_HEADER_SIZE].copy_from_slice(&new_header);
+ Ok(Some(out))
+ } else {
+ sedona_internal_err!("Unexpected internal state in ST_Collect()")
+ }
}
}
impl Accumulator for CollectionAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let item_ref = if let Some(item_ref) = self.item.as_mut() {
+ item_ref
+ } else {
+ return sedona_internal_err!("Unexpected internal state in
ST_Collect()");
+ };
+
let arg_types = [self.input_type.clone()];
let args = [ColumnarValue::Array(values[0].clone())];
let executor = WkbExecutor::new(&arg_types, &args);
@@ -174,7 +193,7 @@ impl Accumulator for CollectionAccumulator {
.insert(type_and_dims.geometry_type());
self.unique_dimensions.insert(type_and_dims.dimensions());
self.count += 1;
- self.item.write_all(item.buf())?;
+ item_ref.extend_from_slice(item.buf());
}
Ok(())
})?;
@@ -202,7 +221,7 @@ impl Accumulator for CollectionAccumulator {
let serialized_geometry_types =
ScalarValue::Utf8(Some(geometry_types_value));
let serialized_dimensions = ScalarValue::Utf8(Some(dimensions_value));
let serialized_count = ScalarValue::Int64(Some(self.count));
- let serialized_item = ScalarValue::Binary(Some(self.item.clone()));
+ let serialized_item = ScalarValue::Binary(self.item.take());
Ok(vec![
serialized_geometry_types,
@@ -213,7 +232,8 @@ impl Accumulator for CollectionAccumulator {
}
fn size(&self) -> usize {
- size_of::<CollectionAccumulator>() + self.item.capacity()
+ let item_capacity = self.item.as_ref().map(|e|
e.capacity()).unwrap_or(0);
+ size_of::<CollectionAccumulator>() + item_capacity
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -224,6 +244,12 @@ impl Accumulator for CollectionAccumulator {
);
}
+ let item_ref = if let Some(item_ref) = self.item.as_mut() {
+ item_ref
+ } else {
+ return sedona_internal_err!("Unexpected internal state in
ST_Collect()");
+ };
+
let mut geometry_types_iter = as_string_array(&states[0])?.into_iter();
let mut dimensions_iter = as_string_array(&states[1])?.into_iter();
let mut count_iter = as_int64_array(&states[2])?.into_iter();
@@ -255,7 +281,7 @@ impl Accumulator for CollectionAccumulator {
self.unique_geometry_types.extend(geometry_types);
self.unique_dimensions.extend(dimensions);
self.count += count;
- self.item.extend_from_slice(item);
+
item_ref.extend_from_slice(&item[WKB_HEADER_SIZE..item.len()]);
}
_ => {
return sedona_internal_err!(