This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 0770a6d3a73d54c0fde22c039331d881b04a061a Author: pawelkocinski <[email protected]> AuthorDate: Tue Dec 30 18:38:24 2025 +0100 add code so far --- .../sql/execution/python/EvalPythonExec.scala | 102 --------------------- .../execution/python/SedonaArrowPythonRunner.scala | 10 +- .../execution/python/SedonaBasePythonRunner.scala | 12 +-- .../execution/python/SedonaPythonArrowOutput.scala | 37 +++++++- .../org/apache/sedona/sql/TestBaseScala.scala | 3 +- .../org/apache/spark/sql/udf/StrategySuite.scala | 6 +- .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 2 +- 7 files changed, 52 insertions(+), 120 deletions(-) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala deleted file mode 100644 index 11cc8c121f..0000000000 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ /dev/null @@ -1,102 +0,0 @@ -package org.apache.spark.sql.execution.python - -import org.apache.sedona.common.geometrySerde.GeometrySerde -import org.apache.sedona.sql.utils.GeometrySerializer -import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext} -import org.apache.spark.api.python.ChainedPythonFunctions -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, JoinedRow, MutableProjection, PythonUDF, UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.UnaryExecNode -import org.apache.spark.sql.types.{DataType, StructField, StructType} -import org.apache.spark.util.Utils - -import java.io.File -import scala.collection.mutable.ArrayBuffer - -trait EvalPythonExec extends UnaryExecNode { - def udfs: Seq[PythonUDF] - - def resultAttrs: Seq[Attribute] - - override def output: Seq[Attribute] = child.output ++ resultAttrs - - override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) - - private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { - udf.children match { - case Seq(u: PythonUDF) => - val (chained, children) = collectFunctions(u) - (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) - case children => - // There should not be any other UDFs, or the children can't be evaluated directly. - assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF]))) - (ChainedPythonFunctions(Seq(udf.func)), udf.children) - } - } - - protected def evaluate( - funcs: Seq[ChainedPythonFunctions], - argOffsets: Array[Array[Int]], - iter: Iterator[InternalRow], - schema: StructType, - context: TaskContext): Iterator[InternalRow] - - protected override def doExecute(): RDD[InternalRow] = { - val inputRDD = child.execute().map(_.copy()) - - inputRDD.mapPartitions { iter => - val context = TaskContext.get() - val contextAwareIterator = new ContextAwareIterator(context, iter) - - // The queue used to buffer input rows so we can drain it to - // combine input with output from Python. - val queue = HybridRowQueue(context.taskMemoryManager(), - new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length) -// context.addTaskCompletionListener[Unit] { ctx => -// queue.close() -// } - - val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip - - // flatten all the arguments - val allInputs = new ArrayBuffer[Expression] - val dataTypes = new ArrayBuffer[DataType] - val argOffsets = inputs.map { input => - input.map { e => - if (allInputs.exists(_.semanticEquals(e))) { - allInputs.indexWhere(_.semanticEquals(e)) - } else { - allInputs += e - dataTypes += e.dataType - allInputs.length - 1 - } - }.toArray - }.toArray - val projection = MutableProjection.create(allInputs.toSeq, child.output) - projection.initialize(context.partitionId()) - val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => - StructField(s"_$i", dt) - }.toArray) - - // Add rows to queue to join later with the result. - val projectedRowIter = contextAwareIterator.map { inputRow => - queue.add(inputRow.asInstanceOf[UnsafeRow]) - val proj = projection(inputRow) - proj - } - - val materializedResult = projectedRowIter.toSeq - - val outputRowIterator = evaluate( - pyFuncs, argOffsets, materializedResult.toIterator, schema, context) - - val joined = new JoinedRow - val resultProj = UnsafeProjection.create(output, output) - - outputRowIterator.map { outputRow => - resultProj(joined(queue.remove(), outputRow)) - } - } - } -} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala index e6f7f6ddf9..3bb93fe62e 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala @@ -53,9 +53,9 @@ class SedonaArrowPythonRunner( override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback -// override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize -// require( -// bufferSize >= 4, -// "Pandas execution requires more than 4 bytes. Please set higher buffer. " + -// s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( + bufferSize >= 4, + "Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala index d7af4fe771..f1b55e4d24 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala @@ -108,13 +108,13 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( } envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) -// -// val (worker: Socket, pid: Option[Int]) = { -// WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) -// } - val (worker: Socket, pid: Option[Int]) = env.createPythonWorker( - pythonExec, envVars.asScala.toMap) + val (worker: Socket, pid: Option[Int]) = { + WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) + } + +// val (worker: Socket, pid: Option[Int]) = env.createPythonWorker( +// pythonExec, envVars.asScala.toMap) // println("Sedona worker port: " + worker.getPort()) // Whether is the worker released into idle pool or closed. When any codes try to release or diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala index bf32ab2764..20c4859eb2 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -230,6 +230,8 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR private var root: VectorSchemaRoot = _ private var schema: StructType = _ private var vectors: Array[ColumnVector] = _ + private var eos = false + private var nextObj: OUT = _ context.addTaskCompletionListener[Unit] { _ => if (reader != null) { @@ -240,11 +242,40 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR private var batchLoaded = true + def handleEndOfDataSectionSedona (): Unit = { + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { + + } + + eos = true + } + protected override def handleEndOfDataSection(): Unit = { handleMetadataAfterExec(stream) - super.handleEndOfDataSection() + handleEndOfDataSectionSedona() + } + + override def hasNext: Boolean = nextObj != null || { + if (!eos) { + nextObj = read() + hasNext + } else { + false + } + } + + override def next(): OUT = { + if (hasNext) { + val obj = nextObj + nextObj = null.asInstanceOf[OUT] + obj + } else { + Iterator.empty.next() + } } + + protected override def read(): OUT = { if (writerThread.exception.isDefined) { throw writerThread.exception.get @@ -268,7 +299,9 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR read() } } else { - stream.readInt() match { + val specialSign = stream.readInt() + + specialSign match { case SpecialLengths.START_ARROW_STREAM => reader = new ArrowStreamReader(stream, allocator) root = reader.getVectorSchemaRoot() diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index 4eb48e4ca7..32d2f06fc8 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -38,9 +38,10 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { val keyParserExtension = "spark.sedona.enableParserExtension" val warehouseLocation = System.getProperty("user.dir") + "/target/" +// 4425302.491982245 val sparkSession = SedonaContext .builder() - .master("local[1]") + .master("local[*]") .appName("sedonasqlScalaTest") .config("spark.sql.warehouse.dir", warehouseLocation) // We need to be explicit about broadcasting in tests. diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index fa2c4ef5b0..cdde195413 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -94,9 +94,8 @@ class StrategySuite extends TestBaseScala with Matchers { // .save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2") val df = spark.read .format("geoparquet") - .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2") + .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings") .select("geometry") - .limit(1000) df.cache() df.count() @@ -119,7 +118,8 @@ class StrategySuite extends TestBaseScala with Matchers { // nonGeometryVectorizedUDF(col("id")).alias("id_increased"), ) - dfVectorized.show() +// dfVectorized.show() + dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x").selectExpr("sum(x)").show() // dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x").selectExpr("sum(x)").show() // val processingContext = df.queryExecution.explainString(mode = ExplainMode.fromString("extended")) diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index b63237cae6..e941adcff4 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -237,6 +237,6 @@ object ScalarUDF { accumulator = null), dataType = GeometryUDT, pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_DB_UDF, - udfDeterministic = false) + udfDeterministic = true) }
