This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 1b383fe83c0adba6d78393c240379de6dba38ff5
Author: pawelkocinski <[email protected]>
AuthorDate: Mon Dec 22 00:05:05 2025 +0100

    add code so far
---
 .../sql/execution/python/SedonaArrowUtils.scala    | 156 ---------------------
 .../execution/python/SedonaPythonArrowInput.scala  |   5 +-
 2 files changed, 3 insertions(+), 158 deletions(-)

diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala
deleted file mode 100644
index 6c1bb9edd5..0000000000
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-package org.apache.spark.sql.execution.python
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
-import org.apache.arrow.memory.RootAllocator
-import org.apache.arrow.vector.complex.MapVector
-import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
-import org.apache.spark.sql.errors.ExecutionErrors
-import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.ArrowUtils.{fromArrowType, toArrowType}
-
-private[sql] object SedonaArrowUtils {
-
-  val rootAllocator = new RootAllocator(Long.MaxValue)
-
-  /** Maps field from Spark to Arrow. NOTE: timeZoneId required for 
TimestampType */
-  def toArrowField(
-                    name: String,
-                    dt: DataType,
-                    nullable: Boolean,
-                    timeZoneId: String,
-                    largeVarTypes: Boolean = false): Field = {
-    dt match {
-      case GeometryUDT =>
-//        val jsonData = """{"crs": {"$schema": 
"https://proj.org/schemas/v0.7/projjson.schema.json";, "type": "GeographicCRS", 
"name": "WGS 84", "datum_ensemble": {"name": "World Geodetic System 1984 
ensemble", "members": [{"name": "World Geodetic System 1984 (Transit)", "id": 
{"authority": "EPSG", "code": 1166}}, {"name": "World Geodetic System 1984 
(G730)", "id": {"authority": "EPSG", "code": 1152}}, {"name": "World Geodetic 
System 1984 (G873)", "id": {"authority": "EPSG", "code": 1153 [...]
-        val metadata = Map(
-          "empty" -> "empty",
-//          "ARROW:extension:name" -> "geoarrow.wkb",
-//          "ARROW:extension:metadata" -> jsonData,
-        ).asJava
-
-        val fieldType = new FieldType(nullable, ArrowType.Binary.INSTANCE, 
null, metadata)
-        new Field(name, fieldType, Seq.empty[Field].asJava)
-
-      case ArrayType(elementType, containsNull) =>
-        val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null)
-        new Field(name, fieldType,
-          Seq(toArrowField("element", elementType, containsNull, timeZoneId,
-            largeVarTypes)).asJava)
-      case StructType(fields) =>
-        val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, 
null)
-        new Field(name, fieldType,
-          fields.map { field =>
-            toArrowField(field.name, field.dataType, field.nullable, 
timeZoneId, largeVarTypes)
-          }.toSeq.asJava)
-      case MapType(keyType, valueType, valueContainsNull) =>
-        val mapType = new FieldType(nullable, new ArrowType.Map(false), null)
-        // Note: Map Type struct can not be null, Struct Type key field can 
not be null
-        new Field(name, mapType,
-          Seq(toArrowField(MapVector.DATA_VECTOR_NAME,
-            new StructType()
-              .add(MapVector.KEY_NAME, keyType, nullable = false)
-              .add(MapVector.VALUE_NAME, valueType, nullable = 
valueContainsNull),
-            nullable = false,
-            timeZoneId,
-            largeVarTypes)).asJava)
-      case udt: UserDefinedType[_] =>
-        toArrowField(name, udt.sqlType, nullable, timeZoneId, largeVarTypes)
-      case dataType =>
-        val fieldType = new FieldType(nullable, toArrowType(dataType, 
timeZoneId,
-          largeVarTypes), null)
-        new Field(name, fieldType, Seq.empty[Field].asJava)
-      case _ => toArrowField(name, dt, nullable, timeZoneId, largeVarTypes)
-    }
-  }
-
-  def fromArrowField(field: Field): DataType = {
-    field.getType match {
-      case _: ArrowType.Map =>
-        val elementField = field.getChildren.get(0)
-        val keyType = fromArrowField(elementField.getChildren.get(0))
-        val valueType = fromArrowField(elementField.getChildren.get(1))
-        MapType(keyType, valueType, elementField.getChildren.get(1).isNullable)
-      case ArrowType.List.INSTANCE =>
-        val elementField = field.getChildren().get(0)
-        val elementType = fromArrowField(elementField)
-        ArrayType(elementType, containsNull = elementField.isNullable)
-      case ArrowType.Struct.INSTANCE =>
-        val fields = field.getChildren().asScala.map { child =>
-          val dt = fromArrowField(child)
-          StructField(child.getName, dt, child.isNullable)
-        }
-        StructType(fields.toArray)
-      case arrowType => fromArrowType(arrowType)
-    }
-  }
-
-  /** Maps schema from Spark to Arrow. NOTE: timeZoneId required for 
TimestampType in StructType */
-  def toArrowSchema(
-                     schema: StructType,
-                     timeZoneId: String,
-                     errorOnDuplicatedFieldNames: Boolean,
-                     largeVarTypes: Boolean = false): Schema = {
-    new Schema(schema.map { field =>
-      toArrowField(
-        field.name,
-        deduplicateFieldNames(field.dataType, errorOnDuplicatedFieldNames),
-        field.nullable,
-        timeZoneId,
-        largeVarTypes)
-    }.asJava)
-  }
-
-  private def deduplicateFieldNames(
-                                     dt: DataType, 
errorOnDuplicatedFieldNames: Boolean): DataType = dt match {
-    case geometryType: GeometryUDT => geometryType
-    case udt: UserDefinedType[_] => deduplicateFieldNames(udt.sqlType, 
errorOnDuplicatedFieldNames)
-    case st @ StructType(fields) =>
-      val newNames = if (st.names.toSet.size == st.names.length) {
-        st.names
-      } else {
-        if (errorOnDuplicatedFieldNames) {
-          throw ExecutionErrors.duplicatedFieldNameInArrowStructError(st.names)
-        }
-        val genNawName = st.names.groupBy(identity).map {
-          case (name, names) if names.length > 1 =>
-            val i = new AtomicInteger()
-            name -> { () => s"${name}_${i.getAndIncrement()}" }
-          case (name, _) => name -> { () => name }
-        }
-        st.names.map(genNawName(_)())
-      }
-      val newFields =
-        fields.zip(newNames).map { case (StructField(_, dataType, nullable, 
metadata), name) =>
-          StructField(
-            name, deduplicateFieldNames(dataType, 
errorOnDuplicatedFieldNames), nullable, metadata)
-        }
-      StructType(newFields)
-    case ArrayType(elementType, containsNull) =>
-      ArrayType(deduplicateFieldNames(elementType, 
errorOnDuplicatedFieldNames), containsNull)
-    case MapType(keyType, valueType, valueContainsNull) =>
-      MapType(
-        deduplicateFieldNames(keyType, errorOnDuplicatedFieldNames),
-        deduplicateFieldNames(valueType, errorOnDuplicatedFieldNames),
-        valueContainsNull)
-    case _ => dt
-  }
-}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
index 9c0b0c94b4..5567ef28b5 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.arrow.ArrowWriter
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.util.Utils
 import org.apache.spark.{SparkEnv, TaskContext}
 
@@ -93,9 +94,9 @@ private[python] trait SedonaPythonArrowInput[IN] extends 
PythonArrowInput[IN] {
       }
 
       protected override def writeIteratorToStream(dataOut: DataOutputStream): 
Unit = {
-        val arrowSchema = SedonaArrowUtils.toArrowSchema(
+        val arrowSchema = ArrowUtils.toArrowSchema(
           schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes)
-        val allocator = SedonaArrowUtils.rootAllocator.newChildAllocator(
+        val allocator = ArrowUtils.rootAllocator.newChildAllocator(
           s"stdout writer for $pythonExec", 0, Long.MaxValue)
         val root = VectorSchemaRoot.create(arrowSchema, allocator)
 

Reply via email to