This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch add-sedona-serializer in repository https://gitbox.apache.org/repos/asf/sedona-db.git
commit b0e258125f5f46f0d943226b39c012d7209cb4a3 Author: pawelkocinski <[email protected]> AuthorDate: Fri Dec 19 00:10:18 2025 +0100 add geometry to sedona binary conversion --- Cargo.lock | 2 + rust/sedona-functions/src/lib.rs | 2 +- rust/sedona-functions/src/st_to_sedona_spark.rs | 177 +++++++++++++++++++++--- rust/sedona-serde/Cargo.toml | 2 + rust/sedona-serde/src/lib.rs | 2 +- rust/sedona-serde/src/linestring.rs | 65 +++++++++ rust/sedona-serde/src/point.rs | 50 +++++++ rust/sedona-serde/src/polygon.rs | 88 ++++++++++++ rust/sedona-serde/src/serialize.rs | 105 ++++++++++++-- 9 files changed, 462 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 75872990..a5843de9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5199,6 +5199,8 @@ dependencies = [ "arrow-array", "byteorder", "datafusion-common", + "sedona-schema", + "wkb", "wkt 0.14.0", ] diff --git a/rust/sedona-functions/src/lib.rs b/rust/sedona-functions/src/lib.rs index a600358d..cce92a6d 100644 --- a/rust/sedona-functions/src/lib.rs +++ b/rust/sedona-functions/src/lib.rs @@ -62,10 +62,10 @@ mod st_reverse; mod st_setsrid; mod st_srid; mod st_start_point; +mod st_to_sedona_spark; mod st_transform; mod st_translate; pub mod st_union_agg; mod st_xyzm; mod st_xyzm_minmax; mod st_zmflag; -mod st_to_sedona_spark; diff --git a/rust/sedona-functions/src/st_to_sedona_spark.rs b/rust/sedona-functions/src/st_to_sedona_spark.rs index 6ddf7b45..ceac7446 100644 --- a/rust/sedona-functions/src/st_to_sedona_spark.rs +++ b/rust/sedona-functions/src/st_to_sedona_spark.rs @@ -1,15 +1,17 @@ -use std::sync::Arc; +use crate::executor::WkbExecutor; +use arrow_array::builder::BinaryBuilder; use arrow_schema::DataType; use datafusion_expr::scalar_doc_sections::DOC_SECTION_OTHER; -use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF}; -use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY}; use datafusion_expr::{ColumnarValue, Documentation, Volatility}; +use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF}; +use sedona_geometry::wkb_factory::WKB_MIN_PROBABLE_BYTES; +use sedona_schema::datatypes::SedonaType; use sedona_schema::matchers::ArgMatcher; +use sedona_serde::serialize::serialize; +use std::sync::Arc; #[derive(Debug)] -struct STGeomToSedonaSpark { - // out_type: SedonaType, -} +struct STGeomToSedonaSpark {} impl SedonaScalarKernel for STGeomToSedonaSpark { fn return_type(&self, args: &[SedonaType]) -> datafusion_common::Result<Option<SedonaType>> { @@ -21,15 +23,50 @@ impl SedonaScalarKernel for STGeomToSedonaSpark { matcher.match_args(args) } - fn invoke_batch(&self, arg_types: &[SedonaType], args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> { - todo!() + fn invoke_batch( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + ) -> datafusion_common::Result<ColumnarValue> { + let executor = WkbExecutor::new(arg_types, args); + let mut builder = BinaryBuilder::with_capacity( + executor.num_iterations(), + WKB_MIN_PROBABLE_BYTES * executor.num_iterations(), + ); + + let crs_value = match &arg_types[0] { + SedonaType::Wkb(_, crs) => { + match crs { + Some(_crs) => { + Ok(Some(4326)) + }, + None => Ok(None), + } + // + } + _ => Err(datafusion_common::DataFusionError::Internal( + "ST_GeomToSedonaSpark: Unsupported geometry type".to_string(), + )), + }?; + + executor.execute_wkb_void(|maybe_item| { + match maybe_item { + Some(item) => { + serialize(&item, &mut builder, crs_value)?; + builder.append_value([]); + } + None => builder.append_null(), + } + + Ok(()) + })?; + + executor.finish(Arc::new(builder.finish())) } } pub fn st_geomtosedona_udf() -> SedonaScalarUDF { - let kernel = Arc::new(STGeomToSedonaSpark { - // out_type: WKB_GEOMETRY, - }); + let kernel = Arc::new(STGeomToSedonaSpark {}); SedonaScalarUDF::new( "st_geomtosedonaspark", @@ -51,22 +88,122 @@ fn doc() -> Documentation { .build() } - #[cfg(test)] mod tests { - use arrow_schema::DataType; + use crate::st_to_sedona_spark::st_geomtosedona_udf; + use datafusion_common::ScalarValue; + use rstest::rstest; use sedona_schema::datatypes::{Edges, SedonaType}; + use sedona_testing::create::create_scalar; use sedona_testing::testers::ScalarUdfTester; - use crate::st_from_sedona_spark::st_geomfromsedona_udf; - use crate::st_to_sedona_spark::st_geomtosedona_udf; + + const POINT_WKT: &str = "POINT (1 1)"; + const LINESTRING_WKT: &str = "LINESTRING (0 0, 1 1, 2 2)"; + const MULTILINESTRING_WKT: &str = "MULTILINESTRING ((1 1, 2 2), (4 5, 6 7))"; + const MULTIPOINT_WKT: &str = "MULTIPOINT ((1 1), (2 2), (4 5))"; + const POLYGON_WKT: &str = "POLYGON ( + (1 1, 10 1, 10 10, 1 10, 1 1), + (2 2, 4 2, 4 4, 2 4, 2 2), + (6 6, 8 6, 8 8, 6 8, 6 6) + )"; + const MULTIPOLYGON_WKT: &str = "MULTIPOLYGON ( + ( + (1 1, 10 1, 10 10, 1 10, 1 1), + (2 2, 4 2, 4 4, 2 4, 2 2), + (6 6, 8 6, 8 8, 6 8, 6 6) + ), + ( + (12 1, 20 1, 20 9, 12 9, 12 1), + (13 2, 15 2, 15 4, 13 4, 13 2), + (17 5, 19 5, 19 7, 17 7, 17 5) + ) + )"; + const GEOMETRYCOLLECTION_WKT: &str = "GEOMETRYCOLLECTION ( + POINT (4 6), + LINESTRING (4 6,7 10), + POLYGON((4 6,7 10,4 10,4 6)) + )"; + + const COMPLEX_GEOMETRYCOLLECTION_WKT: &str = "GEOMETRYCOLLECTION( + POINT(4 6), + LINESTRING(4 6,7 10), + POLYGON((4 6,7 10,4 10,4 6)), + MULTIPOINT((1 2),(3 4)) + )"; + const NESTED_GEOMETRYCOLLECTION_WKT: &str = "GEOMETRYCOLLECTION ( + POINT (1 1), + GEOMETRYCOLLECTION ( + LINESTRING (0 0, 1 1), + POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0)) + ) + )"; + + const FLOATING_POLYGON_WKT: &str = "POLYGON ( + ( + 12.345678901234 45.678901234567, + 23.456789012345 67.890123456789, + 34.567890123456 56.789012345678, + 45.678901234567 34.567890123456, + 29.876543210987 22.345678901234, + 12.345678901234 45.678901234567 + ), + ( + 25.123456789012 45.987654321098, + 30.987654321098 50.123456789012, + 35.456789012345 45.456789012345, + 30.234567890123 40.987654321098, + 25.123456789012 45.987654321098 + ) + )"; fn get_tester() -> ScalarUdfTester { ScalarUdfTester::new( st_geomtosedona_udf().into(), - vec![ - SedonaType::Wkb(Edges::Planar, None), - SedonaType::Arrow(DataType::Utf8), - ], + vec![SedonaType::Wkb(Edges::Planar, None)], ) } -} \ No newline at end of file + + fn fixture_to_bytes(wkb: &str) -> Vec<u8> { + wkb.split("\n") + .filter(|line| !line.starts_with("//") && !line.is_empty()) + .flat_map(|s| s.split_whitespace()) + .map(|num| num.parse::<u8>().expect("invalid byte")) + .collect::<Vec<u8>>() + } + + #[rstest] + fn test_geometries_serialization( + #[values( + (POINT_WKT, include_str!("fixtures/point.sedona")), + (LINESTRING_WKT, include_str!("fixtures/linestring.sedona")), + (MULTILINESTRING_WKT, include_str!("fixtures/multilinestring.sedona")), + (MULTIPOINT_WKT, include_str!("fixtures/multipoint.sedona")), + (POLYGON_WKT, include_str!("fixtures/polygon.sedona")), + (MULTIPOLYGON_WKT, include_str!("fixtures/multipolygon.sedona")), + (GEOMETRYCOLLECTION_WKT, include_str!("fixtures/geometrycollection.sedona")), + (COMPLEX_GEOMETRYCOLLECTION_WKT, include_str!("fixtures/geometrycollectioncomplex.sedona")), + (NESTED_GEOMETRYCOLLECTION_WKT, include_str!("fixtures/nested_geometry_collection.sedona")), + ("POINT EMPTY", include_str!("fixtures/empty_point.sedona")), + ("LINESTRING EMPTY", include_str!("fixtures/empty_linestring.sedona")), + ("POLYGON EMPTY", include_str!("fixtures/empty_polygon.sedona")), + ("MULTIPOINT EMPTY", include_str!("fixtures/multipoint_empty.sedona")), + ("MULTIPOLYGON EMPTY", include_str!("fixtures/empty_multipolygon.sedona")), + ("MULTILINESTRING EMPTY", include_str!("fixtures/empty_multilinestring.sedona")), + ("GEOMETRYCOLLECTION EMPTY", include_str!("fixtures/empty_geometry_collection.sedona")), + (FLOATING_POLYGON_WKT, include_str!("fixtures/point_float_coords.sedona")) + )] + value: (&str, &str), + ) { + let tester = get_tester(); + + let (input_wkt, fixture) = value; + + let geometry = create_scalar(Some(input_wkt), &SedonaType::Wkb(Edges::Planar, None)); + + let result = tester.invoke_scalar(geometry).unwrap(); + + let binary_geometry = fixture_to_bytes(fixture); + + assert_eq!(result, ScalarValue::Binary(Some(binary_geometry))); + } +} diff --git a/rust/sedona-serde/Cargo.toml b/rust/sedona-serde/Cargo.toml index 7eddb106..dadad9cf 100644 --- a/rust/sedona-serde/Cargo.toml +++ b/rust/sedona-serde/Cargo.toml @@ -31,6 +31,8 @@ result_large_err = "allow" [dependencies] arrow-array = { workspace = true } +sedona-schema = { workspace = true } datafusion-common = { workspace = true } wkt = { workspace = true } byteorder = "1.5.0" +wkb = "0.9.2" diff --git a/rust/sedona-serde/src/lib.rs b/rust/sedona-serde/src/lib.rs index f9791efa..b8b3fb8d 100644 --- a/rust/sedona-serde/src/lib.rs +++ b/rust/sedona-serde/src/lib.rs @@ -19,5 +19,5 @@ pub mod deserialize; mod linestring; mod point; mod polygon; +pub mod serialize; mod wkb; -mod serialize; diff --git a/rust/sedona-serde/src/linestring.rs b/rust/sedona-serde/src/linestring.rs index 6bc8a4f7..3fdf6b6e 100644 --- a/rust/sedona-serde/src/linestring.rs +++ b/rust/sedona-serde/src/linestring.rs @@ -96,3 +96,68 @@ pub fn parse_multilinestring<IN: ByteOrder, OUT: ByteOrder>( Ok(()) } + +pub fn serialize_linestring<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> datafusion_common::Result<()> { + let number_of_points = cursor.read_u32::<OUT>()?; + builder.write_u32::<OUT>(number_of_points)?; + let mut buf = [0u8; 8]; + + for _ in 0..number_of_points * 2 { + cursor.read_exact(&mut buf)?; + _ = builder.write(&buf)?; + } + Ok(()) +} + +pub fn serialize_multilinestring<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> datafusion_common::Result<()> { + let number_of_linestrings = cursor.read_u32::<OUT>()?; + + let metadata_vector = Vec::new(); + let mut metadata_cursor = Cursor::new(metadata_vector); + + let coordinates_vector = Vec::new(); + let mut coordinates_cursor = Cursor::new(coordinates_vector); + + let mut total_number_of_points = 0; + + metadata_cursor.write_u32::<OUT>(number_of_linestrings)?; + + for _ in 0..number_of_linestrings { + let byte_order = cursor.read_u8()?; + let _geometry_type = cursor.read_u32::<OUT>()?; + if _geometry_type != 2 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid geometry type in WKB".to_string(), + )); + } + + if byte_order != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid byte order in WKB".to_string(), + )); + } + + let _number_of_points = cursor.read_u32::<OUT>()?; + total_number_of_points += _number_of_points; + // number_of_points+= _number_of_points; + metadata_cursor.write_u32::<OUT>(_number_of_points)?; + + for _ in 0.._number_of_points * 2 { + let mut buf = [0u8; 8]; + cursor.read_exact(&mut buf)?; + _ = coordinates_cursor.write(&buf)?; + } + } + + builder.write_u32::<OUT>(total_number_of_points)?; + + _ = builder.write(coordinates_cursor.get_ref())?; + _ = builder.write(metadata_cursor.get_ref())?; + Ok(()) +} diff --git a/rust/sedona-serde/src/point.rs b/rust/sedona-serde/src/point.rs index 459437e8..5ca83892 100644 --- a/rust/sedona-serde/src/point.rs +++ b/rust/sedona-serde/src/point.rs @@ -22,6 +22,8 @@ use datafusion_common::error::Result; use std::io::{Cursor, Read, Write}; use wkt::types::Dimension; +const NAN_2X: [u8; 16] = [0, 0, 0, 0, 0, 0, 248, 127, 0, 0, 0, 0, 0, 0, 248, 127]; + fn get_byte_type_for_point(dimension: Dimension) -> u32 { match dimension { Dimension::XY => 1u32, @@ -93,3 +95,51 @@ pub fn parse_multipoint<IN: ByteOrder, OUT: ByteOrder>( Ok(()) } + +pub fn serialize_point<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> Result<()> { + let mut buf = [0u8; 16]; + cursor.read_exact(&mut buf)?; + if buf == NAN_2X { + builder.write_u32::<OUT>(0)?; // no coordinates + + return Ok(()); + } + + builder.write_u32::<OUT>(1)?; // numCoordinates + builder.write_all(&buf)?; + + Ok(()) +} + +pub fn serialize_multipoint<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> Result<()> { + let number_of_points = cursor.read_u32::<OUT>()?; + builder.write_u32::<OUT>(number_of_points)?; // numPoints + for _ in 0..number_of_points { + let endianness_marker = cursor.read_u8()?; + let _geometry_type = cursor.read_u32::<OUT>()?; + + if _geometry_type != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid geometry type in WKB".to_string(), + )); + } + + if endianness_marker != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid byte order in WKB".to_string(), + )); + } + + let mut buf = [0u8; 16]; + cursor.read_exact(&mut buf)?; + builder.write_all(&buf)?; + } + + Ok(()) +} diff --git a/rust/sedona-serde/src/polygon.rs b/rust/sedona-serde/src/polygon.rs index c8fa642e..08fad2b7 100644 --- a/rust/sedona-serde/src/polygon.rs +++ b/rust/sedona-serde/src/polygon.rs @@ -103,3 +103,91 @@ pub(crate) fn write_empty_polygon<OUT: ByteOrder>( Ok(()) } + +pub fn serialize_polygon<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> datafusion_common::Result<()> { + let number_of_rings = cursor.read_u32::<OUT>()?; + + let mut total_points = 0u32; + let coordinates_vector = Vec::new(); + let mut coordinates_cursor = Cursor::new(coordinates_vector); + let metadata_vector = Vec::new(); + let mut metadata_cursor = Cursor::new(metadata_vector); + + metadata_cursor.write_u32::<OUT>(number_of_rings)?; + + for _ in 0..number_of_rings { + let number_of_points_in_ring = cursor.read_u32::<OUT>()?; + metadata_cursor.write_u32::<OUT>(number_of_points_in_ring)?; + + total_points += number_of_points_in_ring; + + let mut buf = vec![0u8; (number_of_points_in_ring * 8 * 2) as usize]; + cursor.read_exact(&mut buf)?; + _ = coordinates_cursor.write(&buf)?; + } + + if total_points != 0 { + builder.write_u32::<OUT>(total_points)?; + + _ = builder.write(coordinates_cursor.get_ref())?; + } + + _ = builder.write(metadata_cursor.get_ref())?; + + Ok(()) +} + +pub fn serialize_multipolygon<OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, +) -> datafusion_common::Result<()> { + let number_of_polygons = cursor.read_u32::<OUT>()?; + + let mut total_points = 0u32; + let coordinates_vector = Vec::new(); + let mut coordinates_cursor = Cursor::new(coordinates_vector); + let metadata_vector = Vec::new(); + let mut metadata_cursor = Cursor::new(metadata_vector); + + metadata_cursor.write_u32::<OUT>(number_of_polygons)?; + + for _ in 0..number_of_polygons { + let endianness_marker = cursor.read_u8()?; + let _geometry_type = cursor.read_u32::<OUT>()?; + if endianness_marker != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid byte order in WKB".to_string(), + )); + } + + if _geometry_type != 3 { + return Err(datafusion_common::DataFusionError::Internal( + "Invalid geometry type in WKB".to_string(), + )); + } + + let number_of_rings = cursor.read_u32::<OUT>()?; + metadata_cursor.write_u32::<OUT>(number_of_rings)?; + + for _ in 0..number_of_rings { + let number_of_points_in_ring = cursor.read_u32::<OUT>()?; + metadata_cursor.write_u32::<OUT>(number_of_points_in_ring)?; + + total_points += number_of_points_in_ring; + + let mut buf = vec![0u8; (number_of_points_in_ring * 8 * 2) as usize]; + cursor.read_exact(&mut buf)?; + _ = coordinates_cursor.write(&buf)?; + } + } + + builder.write_u32::<OUT>(total_points)?; + + _ = builder.write(coordinates_cursor.get_ref())?; + _ = builder.write(metadata_cursor.get_ref())?; + + Ok(()) +} diff --git a/rust/sedona-serde/src/serialize.rs b/rust/sedona-serde/src/serialize.rs index 8a1a4bdd..0f8a4941 100644 --- a/rust/sedona-serde/src/serialize.rs +++ b/rust/sedona-serde/src/serialize.rs @@ -1,15 +1,102 @@ +use crate::linestring::{serialize_linestring, serialize_multilinestring}; +use crate::point::{serialize_multipoint, serialize_point}; +use crate::polygon::{serialize_multipolygon, serialize_polygon}; use arrow_array::builder::BinaryBuilder; -use byteorder::LittleEndian; +use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; use datafusion_common::DataFusionError; -use crate::deserialize::parse_geometry; +use std::io::{Cursor}; +use wkb::reader::Wkb; +use wkt::types::Dimension; -pub fn serialize(builder: &mut BinaryBuilder) -> datafusion_common::Result<()> { +pub fn serialize( + wkb: &Wkb, + builder: &mut BinaryBuilder, + epsg_crs: Option<i32>, +) -> datafusion_common::Result<()> { use std::io::Cursor; + let mut cursor = Cursor::new(wkb.buf()); + let byte_order = cursor.read_u8()?; - // let mut reader = Cursor::new(bytes); + if byte_order != 1 && byte_order != 0 { + return Err(DataFusionError::Internal( + "Invalid byte order in WKB".to_string(), + )); + } - // parse_geometry::<LittleEndian, LittleEndian>(builder, &mut reader, bytes) - Err(DataFusionError::NotImplemented( - "Serialization is not yet implemented".to_string(), - )) -} \ No newline at end of file + match byte_order { + 0 => Err(DataFusionError::Internal( + "BigEndian WKB serialization not implemented".to_string(), + )), + 1 => write_geometry::<LittleEndian, LittleEndian>(builder, &mut cursor, epsg_crs), + _ => unreachable!(), + } +} + +pub fn write_geometry<IN: ByteOrder, OUT: ByteOrder>( + builder: &mut BinaryBuilder, + cursor: &mut Cursor<&[u8]>, + epsg_crs: Option<i32>, +) -> datafusion_common::Result<()> { + let geometry_type = cursor.read_u32::<IN>()?; + verify_geometry_type(geometry_type)?; + + let wkb_byte = geometry_type as u8; + + let preamble_byte: u8 = (wkb_byte << 4) + | (get_coordinate_type_value(Dimension::XY) << 1) + | if epsg_crs.is_some() { 1 } else { 0 }; + + builder.write_u8(preamble_byte)?; + + if let Some(srid) = epsg_crs { + builder.write_u8(((srid >> 16) & 0xFF) as u8)?; + builder.write_u8(((srid >> 8) & 0xFF) as u8)?; + builder.write_u8((srid & 0xFF) as u8)?; + } else { + builder.write_u8(0)?; + builder.write_u8(0)?; + builder.write_u8(0)?; + } + + match wkb_byte { + 1 => return serialize_point::<LittleEndian>(builder, cursor), + 2 => return serialize_linestring::<LittleEndian>(builder, cursor), + 3 => return serialize_polygon::<LittleEndian>(builder, cursor), + 4 => return serialize_multipoint::<LittleEndian>(builder, cursor), + 5 => return serialize_multilinestring::<LittleEndian>(builder, cursor), + 6 => return serialize_multipolygon::<LittleEndian>(builder, cursor), + 7 => { + let number_of_geometries = cursor.read_u32::<IN>()?; + builder.write_u32::<OUT>(number_of_geometries)?; + for _ in 0..number_of_geometries { + _ = cursor.read_u8()?; + write_geometry::<IN, OUT>(builder, cursor, epsg_crs)?; + } + } + _ => { + return Err(DataFusionError::Internal( + "Geometry type not supported yet".to_string(), + )) + } + } + + Ok(()) +} + +fn verify_geometry_type(geometry_type: u32) -> datafusion_common::Result<()> { + match geometry_type { + 1..=7 => Ok(()), + _ => Err(DataFusionError::Internal( + "Unsupported geometry type".to_string(), + )), + } +} + +fn get_coordinate_type_value(dimension: Dimension) -> u8 { + match dimension { + Dimension::XY => 1, + Dimension::XYZ => 2, + Dimension::XYM => 3, + Dimension::XYZM => 4, + } +}
