This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new e94c611ad6 [GH-2716] Always use native byte order when writing WKBs 
(#2177)
e94c611ad6 is described below

commit e94c611ad61cc0d86b9283e8c092b3d1b7b0a80e
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jul 28 16:00:30 2025 +0800

    [GH-2716] Always use native byte order when writing WKBs (#2177)
---
 .../org/apache/sedona/common/utils/GeomUtils.java  | 26 +++++++++++++---------
 .../org/apache/sedona/snowflake/snowsql/UDFs.java  | 21 ++++-------------
 .../apache/sedona/core/spatialRDD/SpatialRDD.java  |  2 +-
 .../wrapper/translation/CircleSerializer.scala     |  4 ++--
 .../wrapper/translation/GeometrySerializer.scala   |  4 ++--
 .../parquet/GeoParquetWriteSupport.scala           |  3 +--
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 10 +++++++++
 .../parquet/GeoParquetWriteSupport.scala           |  3 +--
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 10 +++++++++
 .../parquet/GeoParquetWriteSupport.scala           |  3 +--
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 10 +++++++++
 11 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java 
b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
index 93fba33d7b..98623c7a88 100644
--- a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
+++ b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
@@ -195,12 +195,7 @@ public class GeomUtils {
     if (geometry == null) {
       return null;
     }
-    int endian =
-        ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
-            ? ByteOrderValues.BIG_ENDIAN
-            : ByteOrderValues.LITTLE_ENDIAN;
-    WKBWriter writer =
-        new WKBWriter(GeomUtils.getDimension(geometry), endian, 
geometry.getSRID() != 0);
+    WKBWriter writer = createWKBWriter(GeomUtils.getDimension(geometry), 
geometry.getSRID() != 0);
     return writer.write(geometry);
   }
 
@@ -208,14 +203,23 @@ public class GeomUtils {
     if (geometry == null) {
       return null;
     }
-    int endian =
-        ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
-            ? ByteOrderValues.BIG_ENDIAN
-            : ByteOrderValues.LITTLE_ENDIAN;
-    WKBWriter writer = new WKBWriter(GeomUtils.getDimension(geometry), endian, 
false);
+    WKBWriter writer = createWKBWriter(GeomUtils.getDimension(geometry), 
false);
     return writer.write(geometry);
   }
 
+  private static final int NATIVE_WKB_BYTE_ORDER =
+      ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
+          ? ByteOrderValues.BIG_ENDIAN
+          : ByteOrderValues.LITTLE_ENDIAN;
+
+  public static WKBWriter createWKBWriter(int dimension, boolean includeSRID) {
+    return new WKBWriter(dimension, NATIVE_WKB_BYTE_ORDER, includeSRID);
+  }
+
+  public static WKBWriter createWKBWriter(int dimension) {
+    return createWKBWriter(dimension, false);
+  }
+
   public static Geometry get2dGeom(Geometry geom) {
     Coordinate[] coordinates = geom.getCoordinates();
     GeometryFactory geometryFactory = geom.getFactory();
diff --git 
a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java 
b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
index 98fdbb0608..eb38655883 100644
--- a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
+++ b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
@@ -31,7 +31,6 @@ import 
org.apache.sedona.snowflake.snowsql.annotations.UDFAnnotations;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.Point;
 import org.locationtech.jts.io.ParseException;
-import org.locationtech.jts.io.WKBWriter;
 import org.xml.sax.SAXException;
 
 /**
@@ -1229,34 +1228,22 @@ public class UDFs {
 
   @UDFAnnotations.ParamMeta(argNames = {"geom", "zValue"})
   public static byte[] ST_Force3D(byte[] geom, double zValue) {
-    WKBWriter writer = new WKBWriter(3);
-    return GeometrySerde.serialize(
-        Functions.force3D(
-            
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom))), 
zValue));
+    return 
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom), 
zValue));
   }
 
   @UDFAnnotations.ParamMeta(argNames = {"geom"})
   public static byte[] ST_Force3D(byte[] geom) {
-    WKBWriter writer = new WKBWriter(3);
-    return GeometrySerde.serialize(
-        Functions.force3D(
-            
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom)))));
+    return 
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom)));
   }
 
   @UDFAnnotations.ParamMeta(argNames = {"geom", "zValue"})
   public static byte[] ST_Force3DZ(byte[] geom, double zValue) {
-    WKBWriter writer = new WKBWriter(3);
-    return GeometrySerde.serialize(
-        Functions.force3D(
-            
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom))), 
zValue));
+    return 
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom), 
zValue));
   }
 
   @UDFAnnotations.ParamMeta(argNames = {"geom"})
   public static byte[] ST_Force3DZ(byte[] geom) {
-    WKBWriter writer = new WKBWriter(3);
-    return GeometrySerde.serialize(
-        Functions.force3D(
-            
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom)))));
+    return 
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom)));
   }
 
   @UDFAnnotations.ParamMeta(argNames = {"geom"})
diff --git 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index 50d44b9bf4..8b8f3c96f3 100644
--- 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++ 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -549,7 +549,7 @@ public class SpatialRDD<T extends Geometry> implements 
Serializable {
             new FlatMapFunction<Iterator<T>, String>() {
               @Override
               public Iterator<String> call(Iterator<T> iterator) throws 
Exception {
-                WKBWriter writer = new WKBWriter(3, true);
+                WKBWriter writer = GeomUtils.createWKBWriter(3, true);
                 ArrayList<String> wkbs = new ArrayList<>();
 
                 while (iterator.hasNext()) {
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
 
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
index 586c3f5c4d..2f017a7ae6 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
@@ -19,14 +19,14 @@
 package org.apache.sedona.python.wrapper.translation
 
 import org.apache.sedona.common.geometryObjects.Circle
+import org.apache.sedona.common.utils.GeomUtils
 import org.apache.sedona.python.wrapper.utils.implicits.{DoubleImplicit, 
GeometryEnhancer, IntImplicit}
-import org.locationtech.jts.io.WKBWriter
 
 case class CircleSerializer(geometry: Circle) {
   private val isCircle = Array(1.toByte)
 
   def serialize: Array[Byte] = {
-    val wkbWriter = new WKBWriter(2, 2)
+    val wkbWriter = GeomUtils.createWKBWriter(2)
     val serializedGeom = wkbWriter.write(geometry.getCenterGeometry)
     val userDataBinary = geometry.userDataToUtf8ByteArray
     val userDataLengthArray = userDataBinary.length.toByteArray()
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
 
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
index 29922ec855..3cb57b7346 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
@@ -18,16 +18,16 @@
  */
 package org.apache.sedona.python.wrapper.translation
 
+import org.apache.sedona.common.utils.GeomUtils
 import org.apache.sedona.python.wrapper.utils.implicits.{GeometryEnhancer, 
IntImplicit}
 import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
 
 case class GeometrySerializer(geometry: Geometry) {
 
   private val notCircle = Array(0.toByte)
 
   def serialize: Array[Byte] = {
-    val wkbWriter = new WKBWriter(2, 2)
+    val wkbWriter = GeomUtils.createWKBWriter(2)
     val serializedGeom = wkbWriter.write(geometry)
     val userDataBinary = geometry.userDataToUtf8ByteArray
     val userDataLengthArray = userDataBinary.length.toByteArray()
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 9d6b367408..bda27f56f0 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.types._
 import org.json4s.{DefaultFormats, Extraction, JValue}
 import org.json4s.jackson.JsonMethods.parse
 import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
 
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
@@ -340,7 +339,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) => {
           val serializedGeometry = row.getBinary(ordinal)
           val geom = GeometryUDT.deserialize(serializedGeometry)
-          val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+          val wkbWriter = 
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
           
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
           if (geometryColumnInfo != null) {
             geometryColumnInfo.update(geom)
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 274394f3bb..f2068cf918 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -39,6 +39,7 @@ import org.locationtech.jts.io.WKTReader
 import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
+import java.nio.ByteOrder
 import java.time.LocalDateTime
 import java.time.format.DateTimeFormatter
 import java.util.Collections
@@ -83,6 +84,15 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         newrows
           .getAs[Geometry]("geometry")
           .toString == "MULTIPOLYGON (((180 -16.067132663642447, 180 
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711 
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971 
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180 
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806 
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146 
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+      // The endianness of the WKB should be system native.
+      val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation + 
"/gp_sample1.parquet")
+      df2Binary.collect().foreach { row =>
+        val wkb = row.getAs[Array[Byte]]("geometry")
+        wkb(0) match {
+          case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+          case 0x01 => assert(ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN)
+        }
+      }
     }
     it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and 
Write") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 18f9f4f5c2..a0c2c3120f 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -41,7 +41,6 @@ import org.json4s.{DefaultFormats, Extraction, JValue}
 import org.json4s.jackson.compactJson
 import org.json4s.jackson.JsonMethods.parse
 import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
 
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
@@ -341,7 +340,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) => {
           val serializedGeometry = row.getBinary(ordinal)
           val geom = GeometryUDT.deserialize(serializedGeometry)
-          val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+          val wkbWriter = 
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
           
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
           if (geometryColumnInfo != null) {
             geometryColumnInfo.update(geom)
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index beca265641..d654c62ab9 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -42,6 +42,7 @@ import org.locationtech.jts.io.WKTReader
 import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
+import java.nio.ByteOrder
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicLong
 import java.time.LocalDateTime
@@ -85,6 +86,15 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         newrows
           .getAs[Geometry]("geometry")
           .toString == "MULTIPOLYGON (((180 -16.067132663642447, 180 
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711 
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971 
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180 
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806 
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146 
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+      // The endianness of the WKB should be system native.
+      val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation + 
"/gp_sample1.parquet")
+      df2Binary.collect().foreach { row =>
+        val wkb = row.getAs[Array[Byte]]("geometry")
+        wkb(0) match {
+          case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+          case 0x01 => assert(ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN)
+        }
+      }
     }
     it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and 
Write") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 18f9f4f5c2..a0c2c3120f 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -41,7 +41,6 @@ import org.json4s.{DefaultFormats, Extraction, JValue}
 import org.json4s.jackson.compactJson
 import org.json4s.jackson.JsonMethods.parse
 import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
 
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
@@ -341,7 +340,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) => {
           val serializedGeometry = row.getBinary(ordinal)
           val geom = GeometryUDT.deserialize(serializedGeometry)
-          val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+          val wkbWriter = 
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
           
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
           if (geometryColumnInfo != null) {
             geometryColumnInfo.update(geom)
diff --git 
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index beca265641..d654c62ab9 100644
--- 
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -42,6 +42,7 @@ import org.locationtech.jts.io.WKTReader
 import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
+import java.nio.ByteOrder
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicLong
 import java.time.LocalDateTime
@@ -85,6 +86,15 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         newrows
           .getAs[Geometry]("geometry")
           .toString == "MULTIPOLYGON (((180 -16.067132663642447, 180 
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711 
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971 
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180 
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806 
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146 
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+      // The endianness of the WKB should be system native.
+      val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation + 
"/gp_sample1.parquet")
+      df2Binary.collect().foreach { row =>
+        val wkb = row.getAs[Array[Byte]]("geometry")
+        wkb(0) match {
+          case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+          case 0x01 => assert(ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN)
+        }
+      }
     }
     it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and 
Write") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)

Reply via email to