This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 10ef3928abab59030a899d87606a8ecaad06fe49
Author: pawelkocinski <[email protected]>
AuthorDate: Thu Jan 1 22:13:46 2026 +0100

    add code so far
---
 .../sql/execution/python/SedonaArrowStrategy.scala |  61 +-------
 .../execution/python/SedonaBasePythonRunner.scala  |  39 +----
 .../execution/python/SedonaDBWorkerFactory.scala   |   5 +-
 .../execution/python/SedonaPythonArrowInput.scala  |   6 +-
 .../execution/python/SedonaPythonArrowOutput.scala | 168 ---------------------
 .../spark/sql/execution/python/WorkerContext.scala |  10 +-
 .../org/apache/spark/sql/udf/StrategySuite.scala   |   8 +-
 7 files changed, 26 insertions(+), 271 deletions(-)

diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
index 05bed6a138..fa6dee3728 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
@@ -23,15 +23,13 @@ import 
org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_UDF, SQL_S
 import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PythonUDF}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.udf.SedonaArrowEvalPython
 import org.apache.spark.{JobArtifactSet, TaskContext}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
 
 import scala.collection.JavaConverters.asScalaIteratorConverter
 
@@ -60,7 +58,9 @@ case class SedonaArrowEvalPythonExec(
   private val batchSize = conf.arrowMaxRecordsPerBatch
   private val sessionLocalTimeZone = conf.sessionLocalTimeZone
   private val largeVarTypes = conf.arrowUseLargeVarTypes
-  private val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf)
+  private val pythonRunnerConf = Map[String, String](
+    SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone
+  )
   private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
 
   protected override def evaluate(
@@ -87,45 +87,11 @@ case class SedonaArrowEvalPythonExec(
           pythonMetrics,
           jobArtifactUUID).compute(batchIter, context.partitionId(), context)
 
-//        val size = columnarBatchIter.size
-//        val iter = columnarBatchIter.foreach { batch =>
-//          processBatch(batch)
-//        }
-//
-//        println("sss")
-//        val data = columnarBatchIter.flatMap { batch =>
-//          batch.rowIterator.asScala
-//        }
-//
-//        val seqData = data.toSeq
-//
-//        val seqDataSize = seqData.size
-//        val seqDataLength = seqData.length
-//        println("ssss")
-
-//        columnarBatchIter.flatMap { batch =>
-//          batch.rowIterator.asScala
-//        }
-
         val result = columnarBatchIter.flatMap { batch =>
-//          val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
-//          assert(outputTypes == actualDataTypes, "Invalid schema from 
pandas_udf: " +
-//            s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
           batch.rowIterator.asScala
         }
-//
-//        try{
-//          val first = result.next().toSeq(schema)
-//        } catch {
-//          case e: Exception => {
-//            println("No data returned from Sedona DB UDF")
-//          }
-//        }
-//
-//        val first = result.next().toSeq(schema)
-
-        println("ssss")
-        return result
+
+        result
 
       case SQL_SCALAR_SEDONA_UDF =>
         val columnarBatchIter = new ArrowPythonRunner(
@@ -142,18 +108,7 @@ case class SedonaArrowEvalPythonExec(
         val iter = columnarBatchIter.flatMap { batch =>
           batch.rowIterator.asScala
         }
-//
-//        iter.map(
-//          row => {
-//            processBatch(row)
-//          }
-//        )
-//
-//        val seqData = iter.toList
-//        println(seqData.head.getClass)
-
-        println("SedonaArrowEvalPythonExec: Executing Sedona DB UDF")
-//        iter
+
         iter
     }
   }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
index f1b55e4d24..06ae60bbf7 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
@@ -38,10 +38,6 @@ import org.apache.spark.util._
 private object SedonaBasePythonRunner {
 
   private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = 
"faulthandler")
-
-  private def faultHandlerLogPath(pid: Int): Path = {
-    new File(faultHandlerLogDir, pid.toString).toPath
-  }
 }
 
 /**
@@ -113,13 +109,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
       WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
     }
 
-//    val (worker: Socket, pid: Option[Int]) = env.createPythonWorker(
-//      pythonExec, envVars.asScala.toMap)
-
-//    println("Sedona worker port: " + worker.getPort())
-    // Whether is the worker released into idle pool or closed. When any codes 
try to release or
-    // close a worker, they should use `releasedOrClosed.compareAndSet` to 
flip the state to make
-    // sure there is only one winner that is going to release or close the 
worker.
     val releasedOrClosed = new AtomicBoolean(false)
 
     // Start a thread to feed the process input from our parent's iterator
@@ -138,35 +127,9 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
     }
 
     writerThread.start()
-//    305996
-//    305997
-//    new SedonaMonitorThread(SparkEnv.get, worker, writerThread, 
context).start()
-//    if (reuseWorker) {
-//      val key = (worker, context.taskAttemptId)
-//      // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
-//      // and task context
-//      if (PythonRunner.runningMonitorThreads.add(key)) {
-//        new MonitorThread(SparkEnv.get, worker, context).start()
-//      }
-//    } else {
-//      new MonitorThread(SparkEnv.get, worker, context).start()
-//    }
-
-    // Return an iterator that read lines from the process's stdout
-//    if (writerThread.isAlive) {
-//
-//    }
-
-//    val path = 
"/Users/pawelkocinski/Desktop/projects/sedonaworker/out_socket_worker_5"
-//      val openedFile = new File(path)
-//      val fileStream = new FileInputStream(openedFile)
-//      val stream = new DataInputStream(new BufferedInputStream(fileStream, 
bufferSize))
-//
 
     val stream = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
-//    println("worker is closed : " + worker.isClosed)
-    // write to a file  for debug
-//    writeDataInputStreamToFile(stream, 
s"/Users/pawelkocinski/Desktop/projects/sedona_java_11/sedona/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/sedona_python_output_${context.taskAttemptId}.bin")
+
     val stdoutIterator = newReaderIterator(
       stream,
       writerThread,
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
index db46ff6d8c..bfcc8ee2cc 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
@@ -36,7 +36,7 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
     extends PythonWorkerFactory(pythonExec, envVars) {
   self =>
 
-  private val sedonaWorkerModule = "sedonaworker.work"
+  private val sedonaWorkerModule = "sedonaworker.reader"
 
   private val simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
   private val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
@@ -92,7 +92,8 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
         self.synchronized {
           simpleWorkers.put(socket, worker)
         }
-        return (socket, Some(pid))
+
+        (socket, Some(pid))
       } catch {
         case e: Exception =>
           throw new SparkException("Python worker failed to connect back.", e)
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
index c4312c2e2e..a0d40121a7 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.util.Utils
 import org.apache.spark.{SparkEnv, TaskContext}
 
-import java.io.{DataOutputStream, FileOutputStream}
+import java.io.{DataOutputStream, File, FileOutputStream}
 import java.net.Socket
 
 /**
@@ -63,6 +63,10 @@ private[python] trait SedonaPythonArrowInput[IN] extends 
PythonArrowInput[IN] {
       context: TaskContext): WriterThread = {
     new WriterThread(env, worker, inputIterator, partitionIndex, context) {
 
+      val dataOutFile = 
s"/tmp/sedona_python_arrow_input_${context.taskAttemptId()}.bin"
+      val dataOutStream = new FileOutputStream(new File(dataOutFile))
+      val dataOut2 = new DataOutputStream(dataOutStream)
+
       protected override def writeCommand(dataOut: DataOutputStream): Unit = {
         handleMetadataBeforeExec(dataOut)
         writeUDF(dataOut, funcs, argOffsets)
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
index 20c4859eb2..8b2d94a1c9 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -33,174 +33,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, 
ColumnarBatch}
 
-/**
- * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the 
logic from
- * Python (Arrow) to JVM (output type being deserialized from ColumnarBatch).
- */
-//private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
-//
-//  protected def pythonMetrics: Map[String, SQLMetric]
-//
-//  val openedFile = new 
File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow")
-//  val stream2 = new FileInputStream(openedFile)
-//  protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { }
-//
-//  protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: 
StructType): OUT
-//  var numberOfReads = 0
-//  var numberOfReadsData = 1
-//
-//  def writeToFile(in: DataInputStream, file: File): Unit = {
-//    val out = new FileOutputStream(file)
-//    try {
-//      val buffer = new Array[Byte](8192)
-//      var bytesRead = 0
-//      while ({
-//        bytesRead = in.read(buffer)
-//        bytesRead != -1
-//      }) {
-//        out.write(buffer, 0, bytesRead)
-//      }
-//    } finally {
-//      out.close()
-//      in.close()
-//    }
-//  }
-//
-//  protected def newReaderIterator(
-//                                   stream: DataInputStream,
-//                                   writerThread: WriterThread,
-//                                   startTime: Long,
-//                                   env: SparkEnv,
-//                                   worker: Socket,
-//                                   pid: Option[Int],
-//                                   releasedOrClosed: AtomicBoolean,
-//                                   context: TaskContext): Iterator[OUT] = {
-//
-//    new ReaderIterator(
-//      stream, writerThread, startTime, env, worker, pid, releasedOrClosed, 
context) {
-//
-//      private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-//        s"stdin reader for $pythonExec", 0, Long.MaxValue)
-//
-//      private var reader: ArrowStreamReader = _
-//      private var root: VectorSchemaRoot = _
-//      private var schema: StructType = _
-//      private var vectors: Array[ColumnVector] = _
-//      private var totalNumberOfRows: Long = 0L
-//
-//      context.addTaskCompletionListener[Unit] { _ =>
-//        if (reader != null) {
-//          reader.close(false)
-//        }
-//        allocator.close()
-//      }
-//
-//      private var batchLoaded = true
-//
-//      protected override def handleEndOfDataSection(): Unit = {
-////        handleMetadataAfterExec(stream)
-////        super.handleEndOfDataSection()
-////        worker.close()
-//        WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = 
envVars.asScala.toMap, worker = worker)
-//      }
-//
-//      override def hasNext: Boolean = {
-//        val value = numberOfReadsData
-//
-//        numberOfReadsData -= 1
-//        value > 0
-//      }
-//
-//      override def next(): OUT = {
-//        val result = read()
-//        if (result == null) {
-//          throw new NoSuchElementException("End of stream")
-//        }
-//        result
-//      }
-//
-//      protected def read2(): Unit = {
-//        reader = new ArrowStreamReader(stream2, allocator)
-//        root = reader.getVectorSchemaRoot()
-//        schema = ArrowUtils.fromArrowSchema(root.getSchema())
-//        vectors = root.getFieldVectors().asScala.map { vector =>
-//          new ArrowColumnVector(vector)
-//        }.toArray[ColumnVector]
-//
-//        val bytesReadStart = reader.bytesRead()
-//        batchLoaded = reader.loadNextBatch()
-//        val batch = new ColumnarBatch(vectors)
-//        val rowCount = root.getRowCount
-//        //        totalNumberOfRows += rowCount
-//        println("Total number of rows: " + totalNumberOfRows)
-//        batch.setNumRows(root.getRowCount)
-//
-//        val out = deserializeColumnarBatch(batch, schema)
-//
-//        //        reader.close(false)
-//        //        worker.close()
-//        //        reader.s
-//        return out
-//      }
-//
-//      protected override def read(): OUT = {
-//        try {
-//          if (reader != null && batchLoaded) {
-//            val bytesReadStart = reader.bytesRead()
-//            batchLoaded = reader.loadNextBatch()
-//            println("ssss")
-//
-//            if (batchLoaded) {
-//              val batch = new ColumnarBatch(vectors)
-//              val rowCount = root.getRowCount
-//              totalNumberOfRows += rowCount
-//              println("Total number of rows: " + totalNumberOfRows)
-//              batch.setNumRows(root.getRowCount)
-//              val bytesReadEnd = reader.bytesRead()
-//              // 1_571_296
-//              // 24_133_432
-//              // 48 264 788
-//              // 48 264 720
-//              // 41076056
-//
-//              pythonMetrics("pythonNumRowsReceived") += rowCount
-//              pythonMetrics("pythonDataReceived") += bytesReadEnd - 
bytesReadStart
-//              val out = deserializeColumnarBatch(batch, schema)
-//              out
-//            } else {
-//              reader.close(false)
-//              allocator.close()
-//              // Reach end of stream. Call `read()` again to read control 
data.
-//              read()
-//            }
-//          } else {
-//            stream.readInt() match {
-//              case SpecialLengths.START_ARROW_STREAM =>
-//                reader = new ArrowStreamReader(stream, allocator)
-//                root = reader.getVectorSchemaRoot()
-//                schema = ArrowUtils.fromArrowSchema(root.getSchema())
-//                vectors = root.getFieldVectors().asScala.map { vector =>
-//                  new ArrowColumnVector(vector)
-//                }.toArray[ColumnVector]
-//                read()
-//              case SpecialLengths.TIMING_DATA =>
-//                handleTimingData()
-//                read()
-//              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
-//                throw handlePythonException()
-//              case SpecialLengths.END_OF_DATA_SECTION =>
-//                handleEndOfDataSection()
-//                null.asInstanceOf[OUT]
-//              case _ =>
-//                handleEndOfDataSection()
-//                null.asInstanceOf[OUT]
-//            }
-//          }
-//        }
-//      }
-//    }
-//  }
-//}
 
 private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
 
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
index 5066516a8b..3aa12467e4 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
@@ -37,11 +37,11 @@ object WorkerContext {
       envVars: Map[String, String],
       worker: Socket): Unit = {
     synchronized {
-      worker.close()
-//      val key = (pythonExec, envVars)
-//      pythonWorkers.get(key).foreach(workerFactory => {
-//        workerFactory.stopWorker(worker)
-//      })
+//      worker.close()
+      val key = (pythonExec, envVars)
+      pythonWorkers.get(key).foreach(workerFactory => {
+        workerFactory.stopWorker(worker)
+      })
     }
   }
 
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index cdde195413..426ed18429 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -22,7 +22,7 @@ import org.apache.sedona.sql.TestBaseScala
 import org.apache.spark.SparkEnv
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.{col, expr}
+import org.apache.spark.sql.functions.{col, expr, lit}
 import org.apache.spark.sql.udf.ScalarUDF.{geoPandasScalaFunction, 
nonGeometryVectorizedUDF, sedonaDBGeometryToGeometryFunction}
 import org.locationtech.jts.io.WKTReader
 import org.scalatest.matchers.should.Matchers
@@ -97,8 +97,8 @@ class StrategySuite extends TestBaseScala with Matchers {
       
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings")
       .select("geometry")
 
-    df.cache()
-    df.count()
+//    df.cache()
+//    df.count()
 //      .limit(100)
 
 
@@ -114,7 +114,7 @@ class StrategySuite extends TestBaseScala with Matchers {
 //        col("id"),
 //        col("version"),
 //        col("bbox"),
-        sedonaDBGeometryToGeometryFunction(col("geometry")).alias("geom"),
+        sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(100)).alias("geom"),
 //        nonGeometryVectorizedUDF(col("id")).alias("id_increased"),
       )
 

Reply via email to