This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 10ef3928abab59030a899d87606a8ecaad06fe49 Author: pawelkocinski <[email protected]> AuthorDate: Thu Jan 1 22:13:46 2026 +0100 add code so far --- .../sql/execution/python/SedonaArrowStrategy.scala | 61 +------- .../execution/python/SedonaBasePythonRunner.scala | 39 +---- .../execution/python/SedonaDBWorkerFactory.scala | 5 +- .../execution/python/SedonaPythonArrowInput.scala | 6 +- .../execution/python/SedonaPythonArrowOutput.scala | 168 --------------------- .../spark/sql/execution/python/WorkerContext.scala | 10 +- .../org/apache/spark/sql/udf/StrategySuite.scala | 8 +- 7 files changed, 26 insertions(+), 271 deletions(-) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala index 05bed6a138..fa6dee3728 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala @@ -23,15 +23,13 @@ import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_UDF, SQL_S import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} +import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType import org.apache.spark.sql.udf.SedonaArrowEvalPython import org.apache.spark.{JobArtifactSet, TaskContext} -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf import scala.collection.JavaConverters.asScalaIteratorConverter @@ -60,7 +58,9 @@ case class SedonaArrowEvalPythonExec( private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone private val largeVarTypes = conf.arrowUseLargeVarTypes - private val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf) + private val pythonRunnerConf = Map[String, String]( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone + ) private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) protected override def evaluate( @@ -87,45 +87,11 @@ case class SedonaArrowEvalPythonExec( pythonMetrics, jobArtifactUUID).compute(batchIter, context.partitionId(), context) -// val size = columnarBatchIter.size -// val iter = columnarBatchIter.foreach { batch => -// processBatch(batch) -// } -// -// println("sss") -// val data = columnarBatchIter.flatMap { batch => -// batch.rowIterator.asScala -// } -// -// val seqData = data.toSeq -// -// val seqDataSize = seqData.size -// val seqDataLength = seqData.length -// println("ssss") - -// columnarBatchIter.flatMap { batch => -// batch.rowIterator.asScala -// } - val result = columnarBatchIter.flatMap { batch => -// val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) -// assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + -// s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") batch.rowIterator.asScala } -// -// try{ -// val first = result.next().toSeq(schema) -// } catch { -// case e: Exception => { -// println("No data returned from Sedona DB UDF") -// } -// } -// -// val first = result.next().toSeq(schema) - - println("ssss") - return result + + result case SQL_SCALAR_SEDONA_UDF => val columnarBatchIter = new ArrowPythonRunner( @@ -142,18 +108,7 @@ case class SedonaArrowEvalPythonExec( val iter = columnarBatchIter.flatMap { batch => batch.rowIterator.asScala } -// -// iter.map( -// row => { -// processBatch(row) -// } -// ) -// -// val seqData = iter.toList -// println(seqData.head.getClass) - - println("SedonaArrowEvalPythonExec: Executing Sedona DB UDF") -// iter + iter } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala index f1b55e4d24..06ae60bbf7 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala @@ -38,10 +38,6 @@ import org.apache.spark.util._ private object SedonaBasePythonRunner { private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") - - private def faultHandlerLogPath(pid: Int): Path = { - new File(faultHandlerLogDir, pid.toString).toPath - } } /** @@ -113,13 +109,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) } -// val (worker: Socket, pid: Option[Int]) = env.createPythonWorker( -// pythonExec, envVars.asScala.toMap) - -// println("Sedona worker port: " + worker.getPort()) - // Whether is the worker released into idle pool or closed. When any codes try to release or - // close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make - // sure there is only one winner that is going to release or close the worker. val releasedOrClosed = new AtomicBoolean(false) // Start a thread to feed the process input from our parent's iterator @@ -138,35 +127,9 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( } writerThread.start() -// 305996 -// 305997 -// new SedonaMonitorThread(SparkEnv.get, worker, writerThread, context).start() -// if (reuseWorker) { -// val key = (worker, context.taskAttemptId) -// // SPARK-35009: avoid creating multiple monitor threads for the same python worker -// // and task context -// if (PythonRunner.runningMonitorThreads.add(key)) { -// new MonitorThread(SparkEnv.get, worker, context).start() -// } -// } else { -// new MonitorThread(SparkEnv.get, worker, context).start() -// } - - // Return an iterator that read lines from the process's stdout -// if (writerThread.isAlive) { -// -// } - -// val path = "/Users/pawelkocinski/Desktop/projects/sedonaworker/out_socket_worker_5" -// val openedFile = new File(path) -// val fileStream = new FileInputStream(openedFile) -// val stream = new DataInputStream(new BufferedInputStream(fileStream, bufferSize)) -// val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) -// println("worker is closed : " + worker.isClosed) - // write to a file for debug -// writeDataInputStreamToFile(stream, s"/Users/pawelkocinski/Desktop/projects/sedona_java_11/sedona/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/sedona_python_output_${context.taskAttemptId}.bin") + val stdoutIterator = newReaderIterator( stream, writerThread, diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala index db46ff6d8c..bfcc8ee2cc 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala @@ -36,7 +36,7 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends PythonWorkerFactory(pythonExec, envVars) { self => - private val sedonaWorkerModule = "sedonaworker.work" + private val sedonaWorkerModule = "sedonaworker.reader" private val simpleWorkers = new mutable.WeakHashMap[Socket, Process]() private val authHelper = new SocketAuthHelper(SparkEnv.get.conf) @@ -92,7 +92,8 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) self.synchronized { simpleWorkers.put(socket, worker) } - return (socket, Some(pid)) + + (socket, Some(pid)) } catch { case e: Exception => throw new SparkException("Python worker failed to connect back.", e) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala index c4312c2e2e..a0d40121a7 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.util.Utils import org.apache.spark.{SparkEnv, TaskContext} -import java.io.{DataOutputStream, FileOutputStream} +import java.io.{DataOutputStream, File, FileOutputStream} import java.net.Socket /** @@ -63,6 +63,10 @@ private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { context: TaskContext): WriterThread = { new WriterThread(env, worker, inputIterator, partitionIndex, context) { + val dataOutFile = s"/tmp/sedona_python_arrow_input_${context.taskAttemptId()}.bin" + val dataOutStream = new FileOutputStream(new File(dataOutFile)) + val dataOut2 = new DataOutputStream(dataOutStream) + protected override def writeCommand(dataOut: DataOutputStream): Unit = { handleMetadataBeforeExec(dataOut) writeUDF(dataOut, funcs, argOffsets) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala index 20c4859eb2..8b2d94a1c9 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -33,174 +33,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, ColumnarBatch} -/** - * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the logic from - * Python (Arrow) to JVM (output type being deserialized from ColumnarBatch). - */ -//private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => -// -// protected def pythonMetrics: Map[String, SQLMetric] -// -// val openedFile = new File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow") -// val stream2 = new FileInputStream(openedFile) -// protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { } -// -// protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: StructType): OUT -// var numberOfReads = 0 -// var numberOfReadsData = 1 -// -// def writeToFile(in: DataInputStream, file: File): Unit = { -// val out = new FileOutputStream(file) -// try { -// val buffer = new Array[Byte](8192) -// var bytesRead = 0 -// while ({ -// bytesRead = in.read(buffer) -// bytesRead != -1 -// }) { -// out.write(buffer, 0, bytesRead) -// } -// } finally { -// out.close() -// in.close() -// } -// } -// -// protected def newReaderIterator( -// stream: DataInputStream, -// writerThread: WriterThread, -// startTime: Long, -// env: SparkEnv, -// worker: Socket, -// pid: Option[Int], -// releasedOrClosed: AtomicBoolean, -// context: TaskContext): Iterator[OUT] = { -// -// new ReaderIterator( -// stream, writerThread, startTime, env, worker, pid, releasedOrClosed, context) { -// -// private val allocator = ArrowUtils.rootAllocator.newChildAllocator( -// s"stdin reader for $pythonExec", 0, Long.MaxValue) -// -// private var reader: ArrowStreamReader = _ -// private var root: VectorSchemaRoot = _ -// private var schema: StructType = _ -// private var vectors: Array[ColumnVector] = _ -// private var totalNumberOfRows: Long = 0L -// -// context.addTaskCompletionListener[Unit] { _ => -// if (reader != null) { -// reader.close(false) -// } -// allocator.close() -// } -// -// private var batchLoaded = true -// -// protected override def handleEndOfDataSection(): Unit = { -//// handleMetadataAfterExec(stream) -//// super.handleEndOfDataSection() -//// worker.close() -// WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = envVars.asScala.toMap, worker = worker) -// } -// -// override def hasNext: Boolean = { -// val value = numberOfReadsData -// -// numberOfReadsData -= 1 -// value > 0 -// } -// -// override def next(): OUT = { -// val result = read() -// if (result == null) { -// throw new NoSuchElementException("End of stream") -// } -// result -// } -// -// protected def read2(): Unit = { -// reader = new ArrowStreamReader(stream2, allocator) -// root = reader.getVectorSchemaRoot() -// schema = ArrowUtils.fromArrowSchema(root.getSchema()) -// vectors = root.getFieldVectors().asScala.map { vector => -// new ArrowColumnVector(vector) -// }.toArray[ColumnVector] -// -// val bytesReadStart = reader.bytesRead() -// batchLoaded = reader.loadNextBatch() -// val batch = new ColumnarBatch(vectors) -// val rowCount = root.getRowCount -// // totalNumberOfRows += rowCount -// println("Total number of rows: " + totalNumberOfRows) -// batch.setNumRows(root.getRowCount) -// -// val out = deserializeColumnarBatch(batch, schema) -// -// // reader.close(false) -// // worker.close() -// // reader.s -// return out -// } -// -// protected override def read(): OUT = { -// try { -// if (reader != null && batchLoaded) { -// val bytesReadStart = reader.bytesRead() -// batchLoaded = reader.loadNextBatch() -// println("ssss") -// -// if (batchLoaded) { -// val batch = new ColumnarBatch(vectors) -// val rowCount = root.getRowCount -// totalNumberOfRows += rowCount -// println("Total number of rows: " + totalNumberOfRows) -// batch.setNumRows(root.getRowCount) -// val bytesReadEnd = reader.bytesRead() -// // 1_571_296 -// // 24_133_432 -// // 48 264 788 -// // 48 264 720 -// // 41076056 -// -// pythonMetrics("pythonNumRowsReceived") += rowCount -// pythonMetrics("pythonDataReceived") += bytesReadEnd - bytesReadStart -// val out = deserializeColumnarBatch(batch, schema) -// out -// } else { -// reader.close(false) -// allocator.close() -// // Reach end of stream. Call `read()` again to read control data. -// read() -// } -// } else { -// stream.readInt() match { -// case SpecialLengths.START_ARROW_STREAM => -// reader = new ArrowStreamReader(stream, allocator) -// root = reader.getVectorSchemaRoot() -// schema = ArrowUtils.fromArrowSchema(root.getSchema()) -// vectors = root.getFieldVectors().asScala.map { vector => -// new ArrowColumnVector(vector) -// }.toArray[ColumnVector] -// read() -// case SpecialLengths.TIMING_DATA => -// handleTimingData() -// read() -// case SpecialLengths.PYTHON_EXCEPTION_THROWN => -// throw handlePythonException() -// case SpecialLengths.END_OF_DATA_SECTION => -// handleEndOfDataSection() -// null.asInstanceOf[OUT] -// case _ => -// handleEndOfDataSection() -// null.asInstanceOf[OUT] -// } -// } -// } -// } -// } -// } -//} private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala index 5066516a8b..3aa12467e4 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala @@ -37,11 +37,11 @@ object WorkerContext { envVars: Map[String, String], worker: Socket): Unit = { synchronized { - worker.close() -// val key = (pythonExec, envVars) -// pythonWorkers.get(key).foreach(workerFactory => { -// workerFactory.stopWorker(worker) -// }) +// worker.close() + val key = (pythonExec, envVars) + pythonWorkers.get(key).foreach(workerFactory => { + workerFactory.stopWorker(worker) + }) } } diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index cdde195413..426ed18429 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -22,7 +22,7 @@ import org.apache.sedona.sql.TestBaseScala import org.apache.spark.SparkEnv import org.apache.spark.security.SocketAuthHelper import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.{col, expr} +import org.apache.spark.sql.functions.{col, expr, lit} import org.apache.spark.sql.udf.ScalarUDF.{geoPandasScalaFunction, nonGeometryVectorizedUDF, sedonaDBGeometryToGeometryFunction} import org.locationtech.jts.io.WKTReader import org.scalatest.matchers.should.Matchers @@ -97,8 +97,8 @@ class StrategySuite extends TestBaseScala with Matchers { .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings") .select("geometry") - df.cache() - df.count() +// df.cache() +// df.count() // .limit(100) @@ -114,7 +114,7 @@ class StrategySuite extends TestBaseScala with Matchers { // col("id"), // col("version"), // col("bbox"), - sedonaDBGeometryToGeometryFunction(col("geometry")).alias("geom"), + sedonaDBGeometryToGeometryFunction(col("geometry"), lit(100)).alias("geom"), // nonGeometryVectorizedUDF(col("id")).alias("id_increased"), )
