This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 1b383fe83c0adba6d78393c240379de6dba38ff5 Author: pawelkocinski <[email protected]> AuthorDate: Mon Dec 22 00:05:05 2025 +0100 add code so far --- .../sql/execution/python/SedonaArrowUtils.scala | 156 --------------------- .../execution/python/SedonaPythonArrowInput.scala | 5 +- 2 files changed, 3 insertions(+), 158 deletions(-) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala deleted file mode 100644 index 6c1bb9edd5..0000000000 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowUtils.scala +++ /dev/null @@ -1,156 +0,0 @@ -package org.apache.spark.sql.execution.python - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConverters._ -import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.complex.MapVector -import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} -import org.apache.spark.sql.errors.ExecutionErrors -import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT -import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.ArrowUtils.{fromArrowType, toArrowType} - -private[sql] object SedonaArrowUtils { - - val rootAllocator = new RootAllocator(Long.MaxValue) - - /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ - def toArrowField( - name: String, - dt: DataType, - nullable: Boolean, - timeZoneId: String, - largeVarTypes: Boolean = false): Field = { - dt match { - case GeometryUDT => -// val jsonData = """{"crs": {"$schema": "https://proj.org/schemas/v0.7/projjson.schema.json", "type": "GeographicCRS", "name": "WGS 84", "datum_ensemble": {"name": "World Geodetic System 1984 ensemble", "members": [{"name": "World Geodetic System 1984 (Transit)", "id": {"authority": "EPSG", "code": 1166}}, {"name": "World Geodetic System 1984 (G730)", "id": {"authority": "EPSG", "code": 1152}}, {"name": "World Geodetic System 1984 (G873)", "id": {"authority": "EPSG", "code": 1153 [...] - val metadata = Map( - "empty" -> "empty", -// "ARROW:extension:name" -> "geoarrow.wkb", -// "ARROW:extension:metadata" -> jsonData, - ).asJava - - val fieldType = new FieldType(nullable, ArrowType.Binary.INSTANCE, null, metadata) - new Field(name, fieldType, Seq.empty[Field].asJava) - - case ArrayType(elementType, containsNull) => - val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null) - new Field(name, fieldType, - Seq(toArrowField("element", elementType, containsNull, timeZoneId, - largeVarTypes)).asJava) - case StructType(fields) => - val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null) - new Field(name, fieldType, - fields.map { field => - toArrowField(field.name, field.dataType, field.nullable, timeZoneId, largeVarTypes) - }.toSeq.asJava) - case MapType(keyType, valueType, valueContainsNull) => - val mapType = new FieldType(nullable, new ArrowType.Map(false), null) - // Note: Map Type struct can not be null, Struct Type key field can not be null - new Field(name, mapType, - Seq(toArrowField(MapVector.DATA_VECTOR_NAME, - new StructType() - .add(MapVector.KEY_NAME, keyType, nullable = false) - .add(MapVector.VALUE_NAME, valueType, nullable = valueContainsNull), - nullable = false, - timeZoneId, - largeVarTypes)).asJava) - case udt: UserDefinedType[_] => - toArrowField(name, udt.sqlType, nullable, timeZoneId, largeVarTypes) - case dataType => - val fieldType = new FieldType(nullable, toArrowType(dataType, timeZoneId, - largeVarTypes), null) - new Field(name, fieldType, Seq.empty[Field].asJava) - case _ => toArrowField(name, dt, nullable, timeZoneId, largeVarTypes) - } - } - - def fromArrowField(field: Field): DataType = { - field.getType match { - case _: ArrowType.Map => - val elementField = field.getChildren.get(0) - val keyType = fromArrowField(elementField.getChildren.get(0)) - val valueType = fromArrowField(elementField.getChildren.get(1)) - MapType(keyType, valueType, elementField.getChildren.get(1).isNullable) - case ArrowType.List.INSTANCE => - val elementField = field.getChildren().get(0) - val elementType = fromArrowField(elementField) - ArrayType(elementType, containsNull = elementField.isNullable) - case ArrowType.Struct.INSTANCE => - val fields = field.getChildren().asScala.map { child => - val dt = fromArrowField(child) - StructField(child.getName, dt, child.isNullable) - } - StructType(fields.toArray) - case arrowType => fromArrowType(arrowType) - } - } - - /** Maps schema from Spark to Arrow. NOTE: timeZoneId required for TimestampType in StructType */ - def toArrowSchema( - schema: StructType, - timeZoneId: String, - errorOnDuplicatedFieldNames: Boolean, - largeVarTypes: Boolean = false): Schema = { - new Schema(schema.map { field => - toArrowField( - field.name, - deduplicateFieldNames(field.dataType, errorOnDuplicatedFieldNames), - field.nullable, - timeZoneId, - largeVarTypes) - }.asJava) - } - - private def deduplicateFieldNames( - dt: DataType, errorOnDuplicatedFieldNames: Boolean): DataType = dt match { - case geometryType: GeometryUDT => geometryType - case udt: UserDefinedType[_] => deduplicateFieldNames(udt.sqlType, errorOnDuplicatedFieldNames) - case st @ StructType(fields) => - val newNames = if (st.names.toSet.size == st.names.length) { - st.names - } else { - if (errorOnDuplicatedFieldNames) { - throw ExecutionErrors.duplicatedFieldNameInArrowStructError(st.names) - } - val genNawName = st.names.groupBy(identity).map { - case (name, names) if names.length > 1 => - val i = new AtomicInteger() - name -> { () => s"${name}_${i.getAndIncrement()}" } - case (name, _) => name -> { () => name } - } - st.names.map(genNawName(_)()) - } - val newFields = - fields.zip(newNames).map { case (StructField(_, dataType, nullable, metadata), name) => - StructField( - name, deduplicateFieldNames(dataType, errorOnDuplicatedFieldNames), nullable, metadata) - } - StructType(newFields) - case ArrayType(elementType, containsNull) => - ArrayType(deduplicateFieldNames(elementType, errorOnDuplicatedFieldNames), containsNull) - case MapType(keyType, valueType, valueContainsNull) => - MapType( - deduplicateFieldNames(keyType, errorOnDuplicatedFieldNames), - deduplicateFieldNames(valueType, errorOnDuplicatedFieldNames), - valueContainsNull) - case _ => dt - } -} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala index 9c0b0c94b4..5567ef28b5 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.arrow.ArrowWriter import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.util.Utils import org.apache.spark.{SparkEnv, TaskContext} @@ -93,9 +94,9 @@ private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - val arrowSchema = SedonaArrowUtils.toArrowSchema( + val arrowSchema = ArrowUtils.toArrowSchema( schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes) - val allocator = SedonaArrowUtils.rootAllocator.newChildAllocator( + val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) val root = VectorSchemaRoot.create(arrowSchema, allocator)
