paleolimbot commented on code in PR #531:
URL: https://github.com/apache/sedona-db/pull/531#discussion_r2718899829
##########
c/sedona-proj/src/st_transform.rs:
##########
@@ -15,31 +15,353 @@
// specific language governing permissions and limitations
// under the License.
use crate::transform::{ProjCrsEngine, ProjCrsEngineBuilder};
-use arrow_array::builder::BinaryBuilder;
+use arrow_array::builder::{BinaryBuilder, StringViewBuilder};
+use arrow_array::ArrayRef;
use arrow_schema::DataType;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::cast::{as_string_view_array, as_struct_array};
+use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::ColumnarValue;
-use geo_traits::to_geo::ToGeoGeometry;
+use sedona_common::sedona_internal_err;
+use sedona_expr::item_crs::make_item_crs;
use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
use sedona_functions::executor::WkbExecutor;
use sedona_geometry::transform::{transform, CachingCrsEngine, CrsEngine,
CrsTransform};
use sedona_geometry::wkb_factory::WKB_MIN_PROBABLE_BYTES;
-use sedona_schema::crs::deserialize_crs;
-use sedona_schema::datatypes::{Edges, SedonaType};
+use sedona_schema::crs::{deserialize_crs, Crs};
+use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY,
WKB_GEOMETRY_ITEM_CRS};
use sedona_schema::matchers::ArgMatcher;
use std::cell::OnceCell;
-use std::rc::Rc;
+use std::io::Write;
+use std::iter::zip;
use std::sync::{Arc, RwLock};
use wkb::reader::Wkb;
-#[derive(Debug)]
-struct STTransform {}
-
/// ST_Transform() implementation using the proj crate
pub fn st_transform_impl() -> ScalarKernelRef {
Arc::new(STTransform {})
}
+#[derive(Debug)]
+struct STTransform {}
+
+impl SedonaScalarKernel for STTransform {
+ fn return_type_from_args_and_scalars(
+ &self,
+ arg_types: &[SedonaType],
+ scalar_args: &[Option<&ScalarValue>],
+ ) -> Result<Option<SedonaType>> {
+ let inputs = zip(arg_types, scalar_args)
+ .map(|(arg_type, arg_scalar)|
ArgInput::from_return_type_arg(arg_type, *arg_scalar))
+ .collect::<Vec<_>>();
+
+ if inputs.len() == 2 {
+ match (inputs[0], inputs[1]) {
+ // ScalarCrs output always returns a Wkb output type with
concrete Crs
+ (ArgInput::Geo(_), ArgInput::ScalarCrs(scalar_value))
+ | (ArgInput::ItemCrs, ArgInput::ScalarCrs(scalar_value)) => {
+ Ok(Some(output_type_from_scalar_crs_value(scalar_value)?))
+ }
+
+ // Geo or ItemCrs with ArrayCrs output always return ItemCrs
output
+ (ArgInput::Geo(_), ArgInput::ArrayCrs)
+ | (ArgInput::ItemCrs, ArgInput::ArrayCrs) => {
+ Ok(Some(WKB_GEOMETRY_ITEM_CRS.clone()))
+ }
+ _ => Ok(None),
+ }
+ } else if inputs.len() == 3 {
+ match (inputs[0], inputs[1], inputs[2]) {
+ // ScalarCrs output always returns a Wkb output type with
concrete Crs
+ (ArgInput::Geo(_), ArgInput::ScalarCrs(_),
ArgInput::ScalarCrs(scalar_value))
+ | (ArgInput::Geo(_), ArgInput::ArrayCrs,
ArgInput::ScalarCrs(scalar_value))
+ | (ArgInput::ItemCrs, ArgInput::ScalarCrs(_),
ArgInput::ScalarCrs(scalar_value))
+ | (ArgInput::ItemCrs, ArgInput::ArrayCrs,
ArgInput::ScalarCrs(scalar_value)) => {
+ Ok(Some(output_type_from_scalar_crs_value(scalar_value)?))
+ }
+
+ // Geo or ItemCrs with ArrayCrs output always return ItemCrs
output
+ (ArgInput::Geo(_), ArgInput::ScalarCrs(_), ArgInput::ArrayCrs)
+ | (ArgInput::Geo(_), ArgInput::ArrayCrs, ArgInput::ArrayCrs)
+ | (ArgInput::ItemCrs, ArgInput::ScalarCrs(_),
ArgInput::ArrayCrs)
+ | (ArgInput::ItemCrs, ArgInput::ArrayCrs, ArgInput::ArrayCrs)
=> {
+ Ok(Some(WKB_GEOMETRY_ITEM_CRS.clone()))
+ }
+ _ => Ok(None),
+ }
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn invoke_batch_from_args(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ _return_type: &SedonaType,
+ _num_rows: usize,
+ ) -> Result<ColumnarValue> {
+ let inputs = zip(arg_types, args)
+ .map(|(arg_type, arg)| ArgInput::from_arg(arg_type, arg))
+ .collect::<Vec<_>>();
+
+ let executor = WkbExecutor::new(arg_types, args);
+ let mut builder = BinaryBuilder::with_capacity(
+ executor.num_iterations(),
+ WKB_MIN_PROBABLE_BYTES * executor.num_iterations(),
+ );
+
+ // Optimize the easy case, where we have exactly one transformation
and there are no
+ // null or missing CRSes to contend with.
+ let from_index = inputs.len() - 2;
+ let to_index = inputs.len() - 1;
+ let (from, to) = (inputs[from_index], inputs[to_index]);
+ if let (Some(from_constant), Some(to_constant)) =
(from.crs_constant()?, to.crs_constant()?)
+ {
+ let maybe_from_crs = deserialize_crs(&from_constant)?;
+ let maybe_to_crs = deserialize_crs(&to_constant)?;
+ if let (Some(from_crs), Some(to_crs)) = (maybe_from_crs,
maybe_to_crs) {
+ with_global_proj_engine(|engine| {
+ let crs_transform = engine
+ .get_transform_crs_to_crs(
+ &from_crs.to_crs_string(),
+ &to_crs.to_crs_string(),
+ None,
+ "",
+ )
+ .map_err(|e|
DataFusionError::Execution(format!("{e}")))?;
+ executor.execute_wkb_void(|maybe_wkb| {
+ match maybe_wkb {
+ Some(wkb) => {
+ invoke_scalar(&wkb, crs_transform.as_ref(),
&mut builder)?;
+ builder.append_value([]);
Review Comment:
Yes, that is expected (invoke_scalar writes into the builder directly but
the append_value is needed to force a new element from the builder).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]