This is an automated email from the ASF dual-hosted git repository. kontinuation pushed a commit to branch sd-format-for-all-types in repository https://gitbox.apache.org/repos/asf/sedona-db.git
commit 18d7184745a44540ca3d97354abafc9baa1567bd Author: Kristin Cowalcijk <[email protected]> AuthorDate: Fri Aug 29 15:00:11 2025 +0800 Start to implement a more generic sd_format function --- rust/sedona-expr/src/scalar_udf.rs | 14 ++ rust/sedona-functions/src/sd_format.rs | 234 ++++++++++++++++++++++++++------- 2 files changed, 197 insertions(+), 51 deletions(-) diff --git a/rust/sedona-expr/src/scalar_udf.rs b/rust/sedona-expr/src/scalar_udf.rs index 2e3bcce..efdf619 100644 --- a/rust/sedona-expr/src/scalar_udf.rs +++ b/rust/sedona-expr/src/scalar_udf.rs @@ -165,6 +165,11 @@ impl ArgMatcher { arg_iter.next().is_none() } + /// Matches any argument + pub fn is_any() -> Arc<dyn TypeMatcher + Send + Sync> { + Arc::new(IsAny {}) + } + /// Matches the given Arrow type using PartialEq pub fn is_arrow(data_type: DataType) -> Arc<dyn TypeMatcher + Send + Sync> { Arc::new(IsExact { @@ -222,6 +227,15 @@ pub trait TypeMatcher: Debug { } } +#[derive(Debug)] +struct IsAny; + +impl TypeMatcher for IsAny { + fn match_type(&self, _arg: &SedonaType) -> bool { + true + } +} + #[derive(Debug)] struct IsExact { exact_type: SedonaType, diff --git a/rust/sedona-functions/src/sd_format.rs b/rust/sedona-functions/src/sd_format.rs index d16d8bc..265085c 100644 --- a/rust/sedona-functions/src/sd_format.rs +++ b/rust/sedona-functions/src/sd_format.rs @@ -1,11 +1,13 @@ use std::{sync::Arc, vec}; use crate::executor::WkbExecutor; -use arrow_array::builder::StringBuilder; -use arrow_schema::DataType; +use arrow_array::{ + builder::StringBuilder, cast::AsArray, Array, GenericListArray, OffsetSizeTrait, StructArray, +}; +use arrow_schema::{DataType, Field, Fields}; use datafusion_common::{ error::{DataFusionError, Result}, - ScalarValue, + internal_err, ScalarValue, }; use datafusion_expr::{ scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation, Volatility, @@ -21,7 +23,7 @@ use sedona_schema::datatypes::SedonaType; pub fn sd_format_udf() -> SedonaScalarUDF { SedonaScalarUDF::new( "sd_format", - vec![Arc::new(SDFormatDefault {}), Arc::new(SDFormatGeometry {})], + vec![Arc::new(SDFormatDefault {})], Volatility::Immutable, Some(sd_format_doc()), ) @@ -57,33 +59,17 @@ struct SDFormatDefault {} impl SedonaScalarKernel for SDFormatDefault { fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> { - Ok(Some(args[0].clone())) - } - - fn invoke_batch( - &self, - _arg_types: &[SedonaType], - args: &[ColumnarValue], - ) -> Result<ColumnarValue> { - Ok(args[0].clone()) - } -} - -/// Implementation format geometry or geography -/// -/// This is very similar to ST_AsText except it respects the width_hint by -/// stopping the render for each item when too many characters have been written. -#[derive(Debug)] -struct SDFormatGeometry {} - -impl SedonaScalarKernel for SDFormatGeometry { - fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> { + let sedona_type = &args[0]; + let formatted_type = sedona_type_to_formatted_type(sedona_type)?; + if formatted_type == *sedona_type { + return Ok(None); + } let matcher = ArgMatcher::new( vec![ - ArgMatcher::is_geometry_or_geography(), + ArgMatcher::is_any(), ArgMatcher::is_optional(ArgMatcher::is_string()), ], - SedonaType::Arrow(DataType::Utf8), + formatted_type, ); matcher.match_args(args) } @@ -113,39 +99,185 @@ impl SedonaScalarKernel for SDFormatGeometry { } } - let executor = WkbExecutor::new(&arg_types[0..1], &args[0..1]); + columnar_value_to_formatted_value(&arg_types[0], &args[0], maybe_width_hint) + } +} + +fn sedona_type_to_formatted_type(sedona_type: &SedonaType) -> Result<SedonaType> { + match sedona_type { + SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _) => Ok(SedonaType::Arrow(DataType::Utf8)), + SedonaType::Arrow(arrow_type) => { + // dive into the arrow type and translate geospatial types into Utf8 + match arrow_type { + DataType::Struct(fields) => { + let mut new_fields = Vec::with_capacity(fields.len()); + for field in fields { + let new_field = field_to_formatted_field(field)?; + new_fields.push(Arc::new(new_field)); + } + Ok(SedonaType::Arrow(DataType::Struct(new_fields.into()))) + } + DataType::List(field) => { + let new_field = field_to_formatted_field(field)?; + Ok(SedonaType::Arrow(DataType::List(Arc::new(new_field)))) + } + DataType::ListView(field) => { + let new_field = field_to_formatted_field(field)?; + Ok(SedonaType::Arrow(DataType::ListView(Arc::new(new_field)))) + } + _ => Ok(sedona_type.clone()), + } + } + } +} + +fn field_to_formatted_field(field: &Field) -> Result<Field> { + let new_type = sedona_type_to_formatted_type(&SedonaType::from_data_type(field.data_type())?)?; + let new_field = field.clone().with_data_type(new_type.data_type()); + Ok(new_field) +} + +fn columnar_value_to_formatted_value( + sedona_type: &SedonaType, + columnar_value: &ColumnarValue, + maybe_width_hint: Option<usize>, +) -> Result<ColumnarValue> { + match sedona_type { + SedonaType::Wkb(_, _) | SedonaType::WkbView(_, _) => { + geospatial_value_to_formatted_value(sedona_type, columnar_value, maybe_width_hint) + } + SedonaType::Arrow(arrow_type) => match arrow_type { + DataType::Struct(fields) => match columnar_value { + ColumnarValue::Array(array) => { + let struct_array = array.as_struct(); + let formatted_struct_array = + struct_value_to_formatted_value(fields, struct_array, maybe_width_hint)?; + Ok(ColumnarValue::Array(Arc::new(formatted_struct_array))) + } + ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => { + let formatted_struct_array = + struct_value_to_formatted_value(fields, struct_array, maybe_width_hint)?; + Ok(ColumnarValue::Scalar(ScalarValue::Struct(Arc::new( + formatted_struct_array, + )))) + } + _ => internal_err!("Unsupported struct columnar value"), + }, + DataType::List(field) => match columnar_value { + ColumnarValue::Array(array) => { + let list_array = array.as_list::<i32>(); + let formatted_list_array = + list_value_to_formatted_value(field, list_array, maybe_width_hint)?; + Ok(ColumnarValue::Array(Arc::new(formatted_list_array))) + } + ColumnarValue::Scalar(ScalarValue::List(list_array)) => { + let formatted_list_array = + list_value_to_formatted_value(field, list_array, maybe_width_hint)?; + Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new( + formatted_list_array, + )))) + } + _ => internal_err!("Unsupported list columnar value"), + }, + _ => Ok(columnar_value.clone()), + }, + } +} + +/// Implementation format geometry or geography +/// +/// This is very similar to ST_AsText except it respects the width_hint by +/// stopping the render for each item when too many characters have been written. +fn geospatial_value_to_formatted_value( + sedona_type: &SedonaType, + geospatial_value: &ColumnarValue, + maybe_width_hint: Option<usize>, +) -> Result<ColumnarValue> { + let arg_types: &[SedonaType] = std::slice::from_ref(sedona_type); + let args: &[ColumnarValue] = std::slice::from_ref(geospatial_value); + let executor = WkbExecutor::new(arg_types, args); + + let min_output_size = match maybe_width_hint { + Some(width_hint) => executor.num_iterations() * width_hint, + None => executor.num_iterations() * 25, + }; + + // Initialize an output builder of the appropriate type + let mut builder = StringBuilder::with_capacity(executor.num_iterations(), min_output_size); + + executor.execute_wkb_void(|maybe_item| { + match maybe_item { + Some(item) => { + let mut builder_wrapper = + LimitedSizeOutput::new(&mut builder, maybe_width_hint.unwrap_or(usize::MAX)); + + // We ignore this error on purpose: we raised it on purpose to prevent + // the WKT writer from writing too many characters + #[allow(unused_must_use)] + wkt::to_wkt::write_geometry(&mut builder_wrapper, &item); - let min_output_size = match maybe_width_hint { - Some(width_hint) => executor.num_iterations() * width_hint, - None => executor.num_iterations() * 25, + builder.append_value(""); + } + None => builder.append_null(), }; - // Initialize an output builder of the appropriate type - let mut builder = StringBuilder::with_capacity(executor.num_iterations(), min_output_size); + Ok(()) + })?; - executor.execute_wkb_void(|maybe_item| { - match maybe_item { - Some(item) => { - let mut builder_wrapper = LimitedSizeOutput::new( - &mut builder, - maybe_width_hint.unwrap_or(usize::MAX), - ); + executor.finish(Arc::new(builder.finish())) +} - // We ignore this error on purpose: we raised it on purpose to prevent - // the WKT writer from writing too many characters - #[allow(unused_must_use)] - wkt::to_wkt::write_geometry(&mut builder_wrapper, &item); +fn struct_value_to_formatted_value( + fields: &Fields, + struct_array: &StructArray, + maybe_width_hint: Option<usize>, +) -> Result<StructArray> { + let columns = struct_array.columns(); - builder.append_value(""); - } - None => builder.append_null(), - }; + let mut new_fields = Vec::with_capacity(columns.len()); + for (column, field) in columns.iter().zip(fields) { + let new_field = field_to_formatted_field(field)?; + let new_column = columnar_value_to_formatted_value( + &SedonaType::from_data_type(field.data_type())?, + &ColumnarValue::Array(Arc::clone(column)), + maybe_width_hint, + )?; - Ok(()) - })?; + let ColumnarValue::Array(new_array) = new_column else { + return internal_err!("Expected Array"); + }; - executor.finish(Arc::new(builder.finish())) + new_fields.push((Arc::new(new_field), new_array)); } + + Ok(StructArray::from(new_fields)) +} + +fn list_value_to_formatted_value<OffsetSize: OffsetSizeTrait>( + field: &Field, + list_array: &GenericListArray<OffsetSize>, + maybe_width_hint: Option<usize>, +) -> Result<GenericListArray<OffsetSize>> { + let values_array = list_array.values(); + let offsets = list_array.offsets(); + let nulls = list_array.nulls(); + + let new_field = field_to_formatted_field(field)?; + let new_columnar_value = columnar_value_to_formatted_value( + &SedonaType::from_data_type(field.data_type())?, + &ColumnarValue::Array(Arc::clone(values_array)), + maybe_width_hint, + )?; + let ColumnarValue::Array(new_values_array) = new_columnar_value else { + return internal_err!("Expected Array"); + }; + + Ok(GenericListArray::<OffsetSize>::new( + Arc::new(new_field), + offsets.clone(), + new_values_array, + nulls.cloned(), + )) } struct LimitedSizeOutput<'a, T> {
