This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 70cd1d6ee1e9369ab714400751d77f1ddef29b1c
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Dec 30 15:44:54 2025 +0100

    add code so far
---
 .../execution/python/SedonaBasePythonRunner.scala  |  17 +-
 .../execution/python/SedonaDBWorkerFactory.scala   |   6 +-
 .../execution/python/SedonaPythonArrowInput.scala  |  44 +--
 .../execution/python/SedonaPythonArrowOutput.scala | 301 ++++++++++++---------
 .../org/apache/spark/sql/udf/StrategySuite.scala   |   1 +
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala |   2 +-
 6 files changed, 198 insertions(+), 173 deletions(-)

diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
index 3144077c40..d7af4fe771 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
@@ -108,12 +108,15 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
     }
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
+//
+//    val (worker: Socket, pid: Option[Int]) = {
+//      WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
+//    }
 
-    val (worker: Socket, pid: Option[Int]) = {
-      WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
-    }
+    val (worker: Socket, pid: Option[Int]) = env.createPythonWorker(
+      pythonExec, envVars.asScala.toMap)
 
-    println("Sedona worker port: " + worker.getPort())
+//    println("Sedona worker port: " + worker.getPort())
     // Whether is the worker released into idle pool or closed. When any codes 
try to release or
     // close a worker, they should use `releasedOrClosed.compareAndSet` to 
flip the state to make
     // sure there is only one winner that is going to release or close the 
worker.
@@ -154,6 +157,12 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
 //
 //    }
 
+//    val path = 
"/Users/pawelkocinski/Desktop/projects/sedonaworker/out_socket_worker_5"
+//      val openedFile = new File(path)
+//      val fileStream = new FileInputStream(openedFile)
+//      val stream = new DataInputStream(new BufferedInputStream(fileStream, 
bufferSize))
+//
+
     val stream = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
 //    println("worker is closed : " + worker.isClosed)
     // write to a file  for debug
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
index 6dd23930c8..db46ff6d8c 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
@@ -89,9 +89,9 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
         if (pid < 0) {
           throw new IllegalStateException("Python failed to launch worker with 
code " + pid)
         }
-//        self.synchronized {
-//          simpleWorkers.put(socket, worker)
-//        }
+        self.synchronized {
+          simpleWorkers.put(socket, worker)
+        }
         return (socket, Some(pid))
       } catch {
         case e: Exception =>
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
index 91d7a024e9..c4312c2e2e 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -112,53 +112,15 @@ private[python] trait SedonaPythonArrowInput[IN] extends 
PythonArrowInput[IN] {
       }
 
       protected override def writeIteratorToStream(dataOut: DataOutputStream): 
Unit = {
-//        val fileOut = new 
FileOutputStream("/Users/pawelkocinski/Desktop/projects/sedona_java_11/sedona/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/output.dat")
-
-        // 2. Wrap it with DataOutputStream
-//        val dataOut = new DataOutputStream(fileOut)
-
-        val arrowSchema =
-          ArrowUtils.toArrowSchema(schema, timeZoneId, 
errorOnDuplicatedFieldNames, largeVarTypes)
+        val arrowSchema = ArrowUtils.toArrowSchema(
+          schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes)
         val allocator = ArrowUtils.rootAllocator.newChildAllocator(
-          s"stdout writer for $pythonExec",
-          0,
-          Long.MaxValue)
+          s"stdout writer for $pythonExec", 0, Long.MaxValue)
         val root = VectorSchemaRoot.create(arrowSchema, allocator)
 
         Utils.tryWithSafeFinally {
           val writer = new ArrowStreamWriter(root, null, dataOut)
           writer.start()
-//          val buffered = inputIterator.buffered
-//          var allValues = 0
-//          while (buffered.hasNext) {
-//            val value = buffered.next()
-//            val itenralRow = value.asInstanceOf[Iterator[InternalRow]]
-//
-//            val bufferedAll = itenralRow.buffered
-//            while (bufferedAll.hasNext) {
-//              val row = bufferedAll.next()
-//              allValues += 1
-//            }
-//          }
-//
-//          println("Total number of values: " + allValues)
-//          println("ssss")
-//
-//          for (i <- 0 until inputIterator.length) {
-//            val value = inputIterator.next()
-//            val itenralRow = value.asInstanceOf[Iterator[InternalRow]]
-//            val firstElement = itenralRow.next()
-//            for (j <- 0 until 
value.asInstanceOf[Iterator[InternalRow]].length) {
-//              val row = value.asInstanceOf[Iterator[InternalRow]].next()
-//              val vector = root.getVector(i)
-//              println(s"Vector $i: ${vector.getClass.getSimpleName}, name: 
${vector.getName}")
-//              println(s"Row $j: ${row}")
-//            }
-//            println("sss")
-////            println(value)
-////            val vector = root.getVector(i)
-////            println(s"Vector $i: ${vector.getClass.getSimpleName}, name: 
${vector.getName}")
-//          }
 
           writeIteratorToArrowStream(root, writer, dataOut, inputIterator)
 
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
index 50ee3cf17a..bf32ab2764 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -37,34 +37,178 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, 
ColumnVector, Columna
  * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the 
logic from
  * Python (Arrow) to JVM (output type being deserialized from ColumnarBatch).
  */
+//private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
+//
+//  protected def pythonMetrics: Map[String, SQLMetric]
+//
+//  val openedFile = new 
File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow")
+//  val stream2 = new FileInputStream(openedFile)
+//  protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { }
+//
+//  protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: 
StructType): OUT
+//  var numberOfReads = 0
+//  var numberOfReadsData = 1
+//
+//  def writeToFile(in: DataInputStream, file: File): Unit = {
+//    val out = new FileOutputStream(file)
+//    try {
+//      val buffer = new Array[Byte](8192)
+//      var bytesRead = 0
+//      while ({
+//        bytesRead = in.read(buffer)
+//        bytesRead != -1
+//      }) {
+//        out.write(buffer, 0, bytesRead)
+//      }
+//    } finally {
+//      out.close()
+//      in.close()
+//    }
+//  }
+//
+//  protected def newReaderIterator(
+//                                   stream: DataInputStream,
+//                                   writerThread: WriterThread,
+//                                   startTime: Long,
+//                                   env: SparkEnv,
+//                                   worker: Socket,
+//                                   pid: Option[Int],
+//                                   releasedOrClosed: AtomicBoolean,
+//                                   context: TaskContext): Iterator[OUT] = {
+//
+//    new ReaderIterator(
+//      stream, writerThread, startTime, env, worker, pid, releasedOrClosed, 
context) {
+//
+//      private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+//        s"stdin reader for $pythonExec", 0, Long.MaxValue)
+//
+//      private var reader: ArrowStreamReader = _
+//      private var root: VectorSchemaRoot = _
+//      private var schema: StructType = _
+//      private var vectors: Array[ColumnVector] = _
+//      private var totalNumberOfRows: Long = 0L
+//
+//      context.addTaskCompletionListener[Unit] { _ =>
+//        if (reader != null) {
+//          reader.close(false)
+//        }
+//        allocator.close()
+//      }
+//
+//      private var batchLoaded = true
+//
+//      protected override def handleEndOfDataSection(): Unit = {
+////        handleMetadataAfterExec(stream)
+////        super.handleEndOfDataSection()
+////        worker.close()
+//        WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = 
envVars.asScala.toMap, worker = worker)
+//      }
+//
+//      override def hasNext: Boolean = {
+//        val value = numberOfReadsData
+//
+//        numberOfReadsData -= 1
+//        value > 0
+//      }
+//
+//      override def next(): OUT = {
+//        val result = read()
+//        if (result == null) {
+//          throw new NoSuchElementException("End of stream")
+//        }
+//        result
+//      }
+//
+//      protected def read2(): Unit = {
+//        reader = new ArrowStreamReader(stream2, allocator)
+//        root = reader.getVectorSchemaRoot()
+//        schema = ArrowUtils.fromArrowSchema(root.getSchema())
+//        vectors = root.getFieldVectors().asScala.map { vector =>
+//          new ArrowColumnVector(vector)
+//        }.toArray[ColumnVector]
+//
+//        val bytesReadStart = reader.bytesRead()
+//        batchLoaded = reader.loadNextBatch()
+//        val batch = new ColumnarBatch(vectors)
+//        val rowCount = root.getRowCount
+//        //        totalNumberOfRows += rowCount
+//        println("Total number of rows: " + totalNumberOfRows)
+//        batch.setNumRows(root.getRowCount)
+//
+//        val out = deserializeColumnarBatch(batch, schema)
+//
+//        //        reader.close(false)
+//        //        worker.close()
+//        //        reader.s
+//        return out
+//      }
+//
+//      protected override def read(): OUT = {
+//        try {
+//          if (reader != null && batchLoaded) {
+//            val bytesReadStart = reader.bytesRead()
+//            batchLoaded = reader.loadNextBatch()
+//            println("ssss")
+//
+//            if (batchLoaded) {
+//              val batch = new ColumnarBatch(vectors)
+//              val rowCount = root.getRowCount
+//              totalNumberOfRows += rowCount
+//              println("Total number of rows: " + totalNumberOfRows)
+//              batch.setNumRows(root.getRowCount)
+//              val bytesReadEnd = reader.bytesRead()
+//              // 1_571_296
+//              // 24_133_432
+//              // 48 264 788
+//              // 48 264 720
+//              // 41076056
+//
+//              pythonMetrics("pythonNumRowsReceived") += rowCount
+//              pythonMetrics("pythonDataReceived") += bytesReadEnd - 
bytesReadStart
+//              val out = deserializeColumnarBatch(batch, schema)
+//              out
+//            } else {
+//              reader.close(false)
+//              allocator.close()
+//              // Reach end of stream. Call `read()` again to read control 
data.
+//              read()
+//            }
+//          } else {
+//            stream.readInt() match {
+//              case SpecialLengths.START_ARROW_STREAM =>
+//                reader = new ArrowStreamReader(stream, allocator)
+//                root = reader.getVectorSchemaRoot()
+//                schema = ArrowUtils.fromArrowSchema(root.getSchema())
+//                vectors = root.getFieldVectors().asScala.map { vector =>
+//                  new ArrowColumnVector(vector)
+//                }.toArray[ColumnVector]
+//                read()
+//              case SpecialLengths.TIMING_DATA =>
+//                handleTimingData()
+//                read()
+//              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+//                throw handlePythonException()
+//              case SpecialLengths.END_OF_DATA_SECTION =>
+//                handleEndOfDataSection()
+//                null.asInstanceOf[OUT]
+//              case _ =>
+//                handleEndOfDataSection()
+//                null.asInstanceOf[OUT]
+//            }
+//          }
+//        }
+//      }
+//    }
+//  }
+//}
+
 private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
 
   protected def pythonMetrics: Map[String, SQLMetric]
 
-  val openedFile = new 
File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow")
-  val stream2 = new FileInputStream(openedFile)
   protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { }
 
   protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: 
StructType): OUT
-  var numberOfReads = 0
-  var numberOfReadsData = 1
-
-  def writeToFile(in: DataInputStream, file: File): Unit = {
-    val out = new FileOutputStream(file)
-    try {
-      val buffer = new Array[Byte](8192)
-      var bytesRead = 0
-      while ({
-        bytesRead = in.read(buffer)
-        bytesRead != -1
-      }) {
-        out.write(buffer, 0, bytesRead)
-      }
-    } finally {
-      out.close()
-      in.close()
-    }
-  }
 
   protected def newReaderIterator(
                                    stream: DataInputStream,
@@ -86,7 +230,6 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] 
{ self: BasePythonR
       private var root: VectorSchemaRoot = _
       private var schema: StructType = _
       private var vectors: Array[ColumnVector] = _
-      private var totalNumberOfRows: Long = 0L
 
       context.addTaskCompletionListener[Unit] { _ =>
         if (reader != null) {
@@ -98,70 +241,11 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
       private var batchLoaded = true
 
       protected override def handleEndOfDataSection(): Unit = {
-//        handleMetadataAfterExec(stream)
-//        super.handleEndOfDataSection()
-//        worker.close()
-        WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = 
envVars.asScala.toMap, worker = worker)
-      }
-
-      override def hasNext: Boolean = {
-        val value = numberOfReadsData
-
-        numberOfReadsData -= 1
-        value > 0
-      }
-
-      override def next(): OUT = {
-        val result = read()
-        if (result == null) {
-          throw new NoSuchElementException("End of stream")
-        }
-        result
+        handleMetadataAfterExec(stream)
+        super.handleEndOfDataSection()
       }
 
       protected override def read(): OUT = {
-        println("sssss")
-        reader = new ArrowStreamReader(stream2, allocator)
-        root = reader.getVectorSchemaRoot()
-        schema = ArrowUtils.fromArrowSchema(root.getSchema())
-        vectors = root.getFieldVectors().asScala.map { vector =>
-          new ArrowColumnVector(vector)
-        }.toArray[ColumnVector]
-
-        val bytesReadStart = reader.bytesRead()
-        batchLoaded = reader.loadNextBatch()
-        val batch = new ColumnarBatch(vectors)
-        val rowCount = root.getRowCount
-//        totalNumberOfRows += rowCount
-        println("Total number of rows: " + totalNumberOfRows)
-        batch.setNumRows(root.getRowCount)
-
-        val out = deserializeColumnarBatch(batch, schema)
-
-//        reader.close(false)
-//        worker.close()
-//        reader.s
-        return out
-
-        numberOfReads += 1
-        if (numberOfReads > 5) {
-          handleEndOfDataSection()
-          return null.asInstanceOf[OUT]
-        }
-        println("worker is closed : " + worker.isInputShutdown)
-//        while (writerThread.isAlive) {
-//          Thread.sleep(10)
-//          println("waiting for writer to finish...")
-//        }
-//
-//        while (stream.available() == 0) {
-//          Thread.sleep(10)
-//          println("waiting for data...")
-//        }
-//
-//        println(stream.available())
-//
-//        return null.asInstanceOf[OUT]
         if (writerThread.exception.isDefined) {
           throw writerThread.exception.get
         }
@@ -169,25 +253,14 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
           if (reader != null && batchLoaded) {
             val bytesReadStart = reader.bytesRead()
             batchLoaded = reader.loadNextBatch()
-            println("ssss")
-
             if (batchLoaded) {
               val batch = new ColumnarBatch(vectors)
               val rowCount = root.getRowCount
-              totalNumberOfRows += rowCount
-              println("Total number of rows: " + totalNumberOfRows)
               batch.setNumRows(root.getRowCount)
               val bytesReadEnd = reader.bytesRead()
-              // 1_571_296
-              // 24_133_432
-              // 48 264 788
-              // 48 264 720
-              // 41076056
-
               pythonMetrics("pythonNumRowsReceived") += rowCount
               pythonMetrics("pythonDataReceived") += bytesReadEnd - 
bytesReadStart
-              val out = deserializeColumnarBatch(batch, schema)
-              out
+              deserializeColumnarBatch(batch, schema)
             } else {
               reader.close(false)
               allocator.close()
@@ -195,47 +268,27 @@ private[python] trait SedonaPythonArrowOutput[OUT <: 
AnyRef] { self: BasePythonR
               read()
             }
           } else {
-//            if (stream.available() == 0) {
-//              println("ssss")
-//              throw handleException
-//              return null.asInstanceOf[OUT]
-//            }
-            val streamType = try {
-              stream.readInt()
-            } catch {
-              case e: Throwable =>
-                SpecialLengths.END_OF_DATA_SECTION
-            }
-
-            streamType match {
+            stream.readInt() match {
               case SpecialLengths.START_ARROW_STREAM =>
-//                file input stream
-                if (numberOfReads > 2) {
-                  return null.asInstanceOf[OUT]
-                }
-
-                val stream2 = new FileInputStream(new 
File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow"))
-                reader = new ArrowStreamReader(stream2, allocator)
+                reader = new ArrowStreamReader(stream, allocator)
                 root = reader.getVectorSchemaRoot()
                 schema = ArrowUtils.fromArrowSchema(root.getSchema())
                 vectors = root.getFieldVectors().asScala.map { vector =>
                   new ArrowColumnVector(vector)
                 }.toArray[ColumnVector]
+
                 read()
-//              case SpecialLengths.TIMING_DATA =>
-//                handleTimingData()
-//                read()
-//              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
-//                throw handlePythonException()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
               case SpecialLengths.END_OF_DATA_SECTION =>
                 handleEndOfDataSection()
                 null.asInstanceOf[OUT]
-              case _ =>
-                handleEndOfDataSection()
-                null.asInstanceOf[OUT]
             }
           }
-        }
+        } catch handleException
       }
     }
   }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index e638a05f25..fa2c4ef5b0 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -96,6 +96,7 @@ class StrategySuite extends TestBaseScala with Matchers {
       .format("geoparquet")
       
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2")
       .select("geometry")
+      .limit(1000)
 
     df.cache()
     df.count()
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 89afa10986..b63237cae6 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -46,7 +46,7 @@ object ScalarUDF {
   }
 
   SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false)
-//  SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, 
"org.apache.sedona.python.SedonaPythonWorker")
+  SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.work")
 
   private[spark] lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "")
   protected lazy val sparkHome: String = {

Reply via email to