This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new e94c611ad6 [GH-2716] Always use native byte order when writing WKBs
(#2177)
e94c611ad6 is described below
commit e94c611ad61cc0d86b9283e8c092b3d1b7b0a80e
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jul 28 16:00:30 2025 +0800
[GH-2716] Always use native byte order when writing WKBs (#2177)
---
.../org/apache/sedona/common/utils/GeomUtils.java | 26 +++++++++++++---------
.../org/apache/sedona/snowflake/snowsql/UDFs.java | 21 ++++-------------
.../apache/sedona/core/spatialRDD/SpatialRDD.java | 2 +-
.../wrapper/translation/CircleSerializer.scala | 4 ++--
.../wrapper/translation/GeometrySerializer.scala | 4 ++--
.../parquet/GeoParquetWriteSupport.scala | 3 +--
.../org/apache/sedona/sql/geoparquetIOTests.scala | 10 +++++++++
.../parquet/GeoParquetWriteSupport.scala | 3 +--
.../org/apache/sedona/sql/geoparquetIOTests.scala | 10 +++++++++
.../parquet/GeoParquetWriteSupport.scala | 3 +--
.../org/apache/sedona/sql/geoparquetIOTests.scala | 10 +++++++++
11 files changed, 57 insertions(+), 39 deletions(-)
diff --git a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
index 93fba33d7b..98623c7a88 100644
--- a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
+++ b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
@@ -195,12 +195,7 @@ public class GeomUtils {
if (geometry == null) {
return null;
}
- int endian =
- ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
- ? ByteOrderValues.BIG_ENDIAN
- : ByteOrderValues.LITTLE_ENDIAN;
- WKBWriter writer =
- new WKBWriter(GeomUtils.getDimension(geometry), endian,
geometry.getSRID() != 0);
+ WKBWriter writer = createWKBWriter(GeomUtils.getDimension(geometry),
geometry.getSRID() != 0);
return writer.write(geometry);
}
@@ -208,14 +203,23 @@ public class GeomUtils {
if (geometry == null) {
return null;
}
- int endian =
- ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
- ? ByteOrderValues.BIG_ENDIAN
- : ByteOrderValues.LITTLE_ENDIAN;
- WKBWriter writer = new WKBWriter(GeomUtils.getDimension(geometry), endian,
false);
+ WKBWriter writer = createWKBWriter(GeomUtils.getDimension(geometry),
false);
return writer.write(geometry);
}
+ private static final int NATIVE_WKB_BYTE_ORDER =
+ ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN
+ ? ByteOrderValues.BIG_ENDIAN
+ : ByteOrderValues.LITTLE_ENDIAN;
+
+ public static WKBWriter createWKBWriter(int dimension, boolean includeSRID) {
+ return new WKBWriter(dimension, NATIVE_WKB_BYTE_ORDER, includeSRID);
+ }
+
+ public static WKBWriter createWKBWriter(int dimension) {
+ return createWKBWriter(dimension, false);
+ }
+
public static Geometry get2dGeom(Geometry geom) {
Coordinate[] coordinates = geom.getCoordinates();
GeometryFactory geometryFactory = geom.getFactory();
diff --git
a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
index 98fdbb0608..eb38655883 100644
--- a/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
+++ b/snowflake/src/main/java/org/apache/sedona/snowflake/snowsql/UDFs.java
@@ -31,7 +31,6 @@ import
org.apache.sedona.snowflake.snowsql.annotations.UDFAnnotations;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.io.ParseException;
-import org.locationtech.jts.io.WKBWriter;
import org.xml.sax.SAXException;
/**
@@ -1229,34 +1228,22 @@ public class UDFs {
@UDFAnnotations.ParamMeta(argNames = {"geom", "zValue"})
public static byte[] ST_Force3D(byte[] geom, double zValue) {
- WKBWriter writer = new WKBWriter(3);
- return GeometrySerde.serialize(
- Functions.force3D(
-
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom))),
zValue));
+ return
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom),
zValue));
}
@UDFAnnotations.ParamMeta(argNames = {"geom"})
public static byte[] ST_Force3D(byte[] geom) {
- WKBWriter writer = new WKBWriter(3);
- return GeometrySerde.serialize(
- Functions.force3D(
-
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom)))));
+ return
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom)));
}
@UDFAnnotations.ParamMeta(argNames = {"geom", "zValue"})
public static byte[] ST_Force3DZ(byte[] geom, double zValue) {
- WKBWriter writer = new WKBWriter(3);
- return GeometrySerde.serialize(
- Functions.force3D(
-
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom))),
zValue));
+ return
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom),
zValue));
}
@UDFAnnotations.ParamMeta(argNames = {"geom"})
public static byte[] ST_Force3DZ(byte[] geom) {
- WKBWriter writer = new WKBWriter(3);
- return GeometrySerde.serialize(
- Functions.force3D(
-
GeometrySerde.deserialize(writer.write(GeometrySerde.deserialize(geom)))));
+ return
GeometrySerde.serialize(Functions.force3D(GeometrySerde.deserialize(geom)));
}
@UDFAnnotations.ParamMeta(argNames = {"geom"})
diff --git
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index 50d44b9bf4..8b8f3c96f3 100644
---
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -549,7 +549,7 @@ public class SpatialRDD<T extends Geometry> implements
Serializable {
new FlatMapFunction<Iterator<T>, String>() {
@Override
public Iterator<String> call(Iterator<T> iterator) throws
Exception {
- WKBWriter writer = new WKBWriter(3, true);
+ WKBWriter writer = GeomUtils.createWKBWriter(3, true);
ArrayList<String> wkbs = new ArrayList<>();
while (iterator.hasNext()) {
diff --git
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
index 586c3f5c4d..2f017a7ae6 100644
---
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
+++
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/CircleSerializer.scala
@@ -19,14 +19,14 @@
package org.apache.sedona.python.wrapper.translation
import org.apache.sedona.common.geometryObjects.Circle
+import org.apache.sedona.common.utils.GeomUtils
import org.apache.sedona.python.wrapper.utils.implicits.{DoubleImplicit,
GeometryEnhancer, IntImplicit}
-import org.locationtech.jts.io.WKBWriter
case class CircleSerializer(geometry: Circle) {
private val isCircle = Array(1.toByte)
def serialize: Array[Byte] = {
- val wkbWriter = new WKBWriter(2, 2)
+ val wkbWriter = GeomUtils.createWKBWriter(2)
val serializedGeom = wkbWriter.write(geometry.getCenterGeometry)
val userDataBinary = geometry.userDataToUtf8ByteArray
val userDataLengthArray = userDataBinary.length.toByteArray()
diff --git
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
index 29922ec855..3cb57b7346 100644
---
a/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
+++
b/spark/common/src/main/scala/org/apache/sedona/python/wrapper/translation/GeometrySerializer.scala
@@ -18,16 +18,16 @@
*/
package org.apache.sedona.python.wrapper.translation
+import org.apache.sedona.common.utils.GeomUtils
import org.apache.sedona.python.wrapper.utils.implicits.{GeometryEnhancer,
IntImplicit}
import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
case class GeometrySerializer(geometry: Geometry) {
private val notCircle = Array(0.toByte)
def serialize: Array[Byte] = {
- val wkbWriter = new WKBWriter(2, 2)
+ val wkbWriter = GeomUtils.createWKBWriter(2)
val serializedGeom = wkbWriter.write(geometry)
val userDataBinary = geometry.userDataToUtf8ByteArray
val userDataLengthArray = userDataBinary.length.toByteArray()
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 9d6b367408..bda27f56f0 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.types._
import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -340,7 +339,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
(row: SpecializedGetters, ordinal: Int) => {
val serializedGeometry = row.getBinary(ordinal)
val geom = GeometryUDT.deserialize(serializedGeometry)
- val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+ val wkbWriter =
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
if (geometryColumnInfo != null) {
geometryColumnInfo.update(geom)
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 274394f3bb..f2068cf918 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -39,6 +39,7 @@ import org.locationtech.jts.io.WKTReader
import org.scalatest.BeforeAndAfterAll
import java.io.File
+import java.nio.ByteOrder
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.Collections
@@ -83,6 +84,15 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
newrows
.getAs[Geometry]("geometry")
.toString == "MULTIPOLYGON (((180 -16.067132663642447, 180
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+ // The endianness of the WKB should be system native.
+ val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation +
"/gp_sample1.parquet")
+ df2Binary.collect().foreach { row =>
+ val wkb = row.getAs[Array[Byte]]("geometry")
+ wkb(0) match {
+ case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+ case 0x01 => assert(ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN)
+ }
+ }
}
it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and
Write") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 18f9f4f5c2..a0c2c3120f 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -41,7 +41,6 @@ import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -341,7 +340,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
(row: SpecializedGetters, ordinal: Int) => {
val serializedGeometry = row.getBinary(ordinal)
val geom = GeometryUDT.deserialize(serializedGeometry)
- val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+ val wkbWriter =
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
if (geometryColumnInfo != null) {
geometryColumnInfo.update(geom)
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index beca265641..d654c62ab9 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -42,6 +42,7 @@ import org.locationtech.jts.io.WKTReader
import org.scalatest.BeforeAndAfterAll
import java.io.File
+import java.nio.ByteOrder
import java.util.Collections
import java.util.concurrent.atomic.AtomicLong
import java.time.LocalDateTime
@@ -85,6 +86,15 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
newrows
.getAs[Geometry]("geometry")
.toString == "MULTIPOLYGON (((180 -16.067132663642447, 180
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+ // The endianness of the WKB should be system native.
+ val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation +
"/gp_sample1.parquet")
+ df2Binary.collect().foreach { row =>
+ val wkb = row.getAs[Array[Byte]]("geometry")
+ wkb(0) match {
+ case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+ case 0x01 => assert(ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN)
+ }
+ }
}
it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and
Write") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 18f9f4f5c2..a0c2c3120f 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -41,7 +41,6 @@ import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
-import org.locationtech.jts.io.WKBWriter
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -341,7 +340,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
(row: SpecializedGetters, ordinal: Int) => {
val serializedGeometry = row.getBinary(ordinal)
val geom = GeometryUDT.deserialize(serializedGeometry)
- val wkbWriter = new WKBWriter(GeomUtils.getDimension(geom))
+ val wkbWriter =
GeomUtils.createWKBWriter(GeomUtils.getDimension(geom))
recordConsumer.addBinary(Binary.fromReusedByteArray(wkbWriter.write(geom)))
if (geometryColumnInfo != null) {
geometryColumnInfo.update(geom)
diff --git
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index beca265641..d654c62ab9 100644
---
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -42,6 +42,7 @@ import org.locationtech.jts.io.WKTReader
import org.scalatest.BeforeAndAfterAll
import java.io.File
+import java.nio.ByteOrder
import java.util.Collections
import java.util.concurrent.atomic.AtomicLong
import java.time.LocalDateTime
@@ -85,6 +86,15 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
newrows
.getAs[Geometry]("geometry")
.toString == "MULTIPOLYGON (((180 -16.067132663642447, 180
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400 [...]
+ // The endianness of the WKB should be system native.
+ val df2Binary = sparkSession.read.parquet(geoparquetoutputlocation +
"/gp_sample1.parquet")
+ df2Binary.collect().foreach { row =>
+ val wkb = row.getAs[Array[Byte]]("geometry")
+ wkb(0) match {
+ case 0x00 => assert(ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN)
+ case 0x01 => assert(ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN)
+ }
+ }
}
it("GEOPARQUET Test example2 i.e. naturalearth_citie dataset's Read and
Write") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation2)