This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 70cd1d6ee1e9369ab714400751d77f1ddef29b1c Author: pawelkocinski <[email protected]> AuthorDate: Tue Dec 30 15:44:54 2025 +0100 add code so far --- .../execution/python/SedonaBasePythonRunner.scala | 17 +- .../execution/python/SedonaDBWorkerFactory.scala | 6 +- .../execution/python/SedonaPythonArrowInput.scala | 44 +-- .../execution/python/SedonaPythonArrowOutput.scala | 301 ++++++++++++--------- .../org/apache/spark/sql/udf/StrategySuite.scala | 1 + .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 2 +- 6 files changed, 198 insertions(+), 173 deletions(-) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala index 3144077c40..d7af4fe771 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala @@ -108,12 +108,15 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( } envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) +// +// val (worker: Socket, pid: Option[Int]) = { +// WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) +// } - val (worker: Socket, pid: Option[Int]) = { - WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) - } + val (worker: Socket, pid: Option[Int]) = env.createPythonWorker( + pythonExec, envVars.asScala.toMap) - println("Sedona worker port: " + worker.getPort()) +// println("Sedona worker port: " + worker.getPort()) // Whether is the worker released into idle pool or closed. When any codes try to release or // close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make // sure there is only one winner that is going to release or close the worker. @@ -154,6 +157,12 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( // // } +// val path = "/Users/pawelkocinski/Desktop/projects/sedonaworker/out_socket_worker_5" +// val openedFile = new File(path) +// val fileStream = new FileInputStream(openedFile) +// val stream = new DataInputStream(new BufferedInputStream(fileStream, bufferSize)) +// + val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) // println("worker is closed : " + worker.isClosed) // write to a file for debug diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala index 6dd23930c8..db46ff6d8c 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala @@ -89,9 +89,9 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) if (pid < 0) { throw new IllegalStateException("Python failed to launch worker with code " + pid) } -// self.synchronized { -// simpleWorkers.put(socket, worker) -// } + self.synchronized { + simpleWorkers.put(socket, worker) + } return (socket, Some(pid)) } catch { case e: Exception => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala index 91d7a024e9..c4312c2e2e 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -112,53 +112,15 @@ private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { -// val fileOut = new FileOutputStream("/Users/pawelkocinski/Desktop/projects/sedona_java_11/sedona/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/output.dat") - - // 2. Wrap it with DataOutputStream -// val dataOut = new DataOutputStream(fileOut) - - val arrowSchema = - ArrowUtils.toArrowSchema(schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes) + val arrowSchema = ArrowUtils.toArrowSchema( + schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes) val allocator = ArrowUtils.rootAllocator.newChildAllocator( - s"stdout writer for $pythonExec", - 0, - Long.MaxValue) + s"stdout writer for $pythonExec", 0, Long.MaxValue) val root = VectorSchemaRoot.create(arrowSchema, allocator) Utils.tryWithSafeFinally { val writer = new ArrowStreamWriter(root, null, dataOut) writer.start() -// val buffered = inputIterator.buffered -// var allValues = 0 -// while (buffered.hasNext) { -// val value = buffered.next() -// val itenralRow = value.asInstanceOf[Iterator[InternalRow]] -// -// val bufferedAll = itenralRow.buffered -// while (bufferedAll.hasNext) { -// val row = bufferedAll.next() -// allValues += 1 -// } -// } -// -// println("Total number of values: " + allValues) -// println("ssss") -// -// for (i <- 0 until inputIterator.length) { -// val value = inputIterator.next() -// val itenralRow = value.asInstanceOf[Iterator[InternalRow]] -// val firstElement = itenralRow.next() -// for (j <- 0 until value.asInstanceOf[Iterator[InternalRow]].length) { -// val row = value.asInstanceOf[Iterator[InternalRow]].next() -// val vector = root.getVector(i) -// println(s"Vector $i: ${vector.getClass.getSimpleName}, name: ${vector.getName}") -// println(s"Row $j: ${row}") -// } -// println("sss") -//// println(value) -//// val vector = root.getVector(i) -//// println(s"Vector $i: ${vector.getClass.getSimpleName}, name: ${vector.getName}") -// } writeIteratorToArrowStream(root, writer, dataOut, inputIterator) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala index 50ee3cf17a..bf32ab2764 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -37,34 +37,178 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, Columna * A trait that can be mixed-in with [[BasePythonRunner]]. It implements the logic from * Python (Arrow) to JVM (output type being deserialized from ColumnarBatch). */ +//private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => +// +// protected def pythonMetrics: Map[String, SQLMetric] +// +// val openedFile = new File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow") +// val stream2 = new FileInputStream(openedFile) +// protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { } +// +// protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: StructType): OUT +// var numberOfReads = 0 +// var numberOfReadsData = 1 +// +// def writeToFile(in: DataInputStream, file: File): Unit = { +// val out = new FileOutputStream(file) +// try { +// val buffer = new Array[Byte](8192) +// var bytesRead = 0 +// while ({ +// bytesRead = in.read(buffer) +// bytesRead != -1 +// }) { +// out.write(buffer, 0, bytesRead) +// } +// } finally { +// out.close() +// in.close() +// } +// } +// +// protected def newReaderIterator( +// stream: DataInputStream, +// writerThread: WriterThread, +// startTime: Long, +// env: SparkEnv, +// worker: Socket, +// pid: Option[Int], +// releasedOrClosed: AtomicBoolean, +// context: TaskContext): Iterator[OUT] = { +// +// new ReaderIterator( +// stream, writerThread, startTime, env, worker, pid, releasedOrClosed, context) { +// +// private val allocator = ArrowUtils.rootAllocator.newChildAllocator( +// s"stdin reader for $pythonExec", 0, Long.MaxValue) +// +// private var reader: ArrowStreamReader = _ +// private var root: VectorSchemaRoot = _ +// private var schema: StructType = _ +// private var vectors: Array[ColumnVector] = _ +// private var totalNumberOfRows: Long = 0L +// +// context.addTaskCompletionListener[Unit] { _ => +// if (reader != null) { +// reader.close(false) +// } +// allocator.close() +// } +// +// private var batchLoaded = true +// +// protected override def handleEndOfDataSection(): Unit = { +//// handleMetadataAfterExec(stream) +//// super.handleEndOfDataSection() +//// worker.close() +// WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = envVars.asScala.toMap, worker = worker) +// } +// +// override def hasNext: Boolean = { +// val value = numberOfReadsData +// +// numberOfReadsData -= 1 +// value > 0 +// } +// +// override def next(): OUT = { +// val result = read() +// if (result == null) { +// throw new NoSuchElementException("End of stream") +// } +// result +// } +// +// protected def read2(): Unit = { +// reader = new ArrowStreamReader(stream2, allocator) +// root = reader.getVectorSchemaRoot() +// schema = ArrowUtils.fromArrowSchema(root.getSchema()) +// vectors = root.getFieldVectors().asScala.map { vector => +// new ArrowColumnVector(vector) +// }.toArray[ColumnVector] +// +// val bytesReadStart = reader.bytesRead() +// batchLoaded = reader.loadNextBatch() +// val batch = new ColumnarBatch(vectors) +// val rowCount = root.getRowCount +// // totalNumberOfRows += rowCount +// println("Total number of rows: " + totalNumberOfRows) +// batch.setNumRows(root.getRowCount) +// +// val out = deserializeColumnarBatch(batch, schema) +// +// // reader.close(false) +// // worker.close() +// // reader.s +// return out +// } +// +// protected override def read(): OUT = { +// try { +// if (reader != null && batchLoaded) { +// val bytesReadStart = reader.bytesRead() +// batchLoaded = reader.loadNextBatch() +// println("ssss") +// +// if (batchLoaded) { +// val batch = new ColumnarBatch(vectors) +// val rowCount = root.getRowCount +// totalNumberOfRows += rowCount +// println("Total number of rows: " + totalNumberOfRows) +// batch.setNumRows(root.getRowCount) +// val bytesReadEnd = reader.bytesRead() +// // 1_571_296 +// // 24_133_432 +// // 48 264 788 +// // 48 264 720 +// // 41076056 +// +// pythonMetrics("pythonNumRowsReceived") += rowCount +// pythonMetrics("pythonDataReceived") += bytesReadEnd - bytesReadStart +// val out = deserializeColumnarBatch(batch, schema) +// out +// } else { +// reader.close(false) +// allocator.close() +// // Reach end of stream. Call `read()` again to read control data. +// read() +// } +// } else { +// stream.readInt() match { +// case SpecialLengths.START_ARROW_STREAM => +// reader = new ArrowStreamReader(stream, allocator) +// root = reader.getVectorSchemaRoot() +// schema = ArrowUtils.fromArrowSchema(root.getSchema()) +// vectors = root.getFieldVectors().asScala.map { vector => +// new ArrowColumnVector(vector) +// }.toArray[ColumnVector] +// read() +// case SpecialLengths.TIMING_DATA => +// handleTimingData() +// read() +// case SpecialLengths.PYTHON_EXCEPTION_THROWN => +// throw handlePythonException() +// case SpecialLengths.END_OF_DATA_SECTION => +// handleEndOfDataSection() +// null.asInstanceOf[OUT] +// case _ => +// handleEndOfDataSection() +// null.asInstanceOf[OUT] +// } +// } +// } +// } +// } +// } +//} + private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => protected def pythonMetrics: Map[String, SQLMetric] - val openedFile = new File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow") - val stream2 = new FileInputStream(openedFile) protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { } protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: StructType): OUT - var numberOfReads = 0 - var numberOfReadsData = 1 - - def writeToFile(in: DataInputStream, file: File): Unit = { - val out = new FileOutputStream(file) - try { - val buffer = new Array[Byte](8192) - var bytesRead = 0 - while ({ - bytesRead = in.read(buffer) - bytesRead != -1 - }) { - out.write(buffer, 0, bytesRead) - } - } finally { - out.close() - in.close() - } - } protected def newReaderIterator( stream: DataInputStream, @@ -86,7 +230,6 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR private var root: VectorSchemaRoot = _ private var schema: StructType = _ private var vectors: Array[ColumnVector] = _ - private var totalNumberOfRows: Long = 0L context.addTaskCompletionListener[Unit] { _ => if (reader != null) { @@ -98,70 +241,11 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR private var batchLoaded = true protected override def handleEndOfDataSection(): Unit = { -// handleMetadataAfterExec(stream) -// super.handleEndOfDataSection() -// worker.close() - WorkerContext.destroyPythonWorker(pythonExec = pythonExec, envVars = envVars.asScala.toMap, worker = worker) - } - - override def hasNext: Boolean = { - val value = numberOfReadsData - - numberOfReadsData -= 1 - value > 0 - } - - override def next(): OUT = { - val result = read() - if (result == null) { - throw new NoSuchElementException("End of stream") - } - result + handleMetadataAfterExec(stream) + super.handleEndOfDataSection() } protected override def read(): OUT = { - println("sssss") - reader = new ArrowStreamReader(stream2, allocator) - root = reader.getVectorSchemaRoot() - schema = ArrowUtils.fromArrowSchema(root.getSchema()) - vectors = root.getFieldVectors().asScala.map { vector => - new ArrowColumnVector(vector) - }.toArray[ColumnVector] - - val bytesReadStart = reader.bytesRead() - batchLoaded = reader.loadNextBatch() - val batch = new ColumnarBatch(vectors) - val rowCount = root.getRowCount -// totalNumberOfRows += rowCount - println("Total number of rows: " + totalNumberOfRows) - batch.setNumRows(root.getRowCount) - - val out = deserializeColumnarBatch(batch, schema) - -// reader.close(false) -// worker.close() -// reader.s - return out - - numberOfReads += 1 - if (numberOfReads > 5) { - handleEndOfDataSection() - return null.asInstanceOf[OUT] - } - println("worker is closed : " + worker.isInputShutdown) -// while (writerThread.isAlive) { -// Thread.sleep(10) -// println("waiting for writer to finish...") -// } -// -// while (stream.available() == 0) { -// Thread.sleep(10) -// println("waiting for data...") -// } -// -// println(stream.available()) -// -// return null.asInstanceOf[OUT] if (writerThread.exception.isDefined) { throw writerThread.exception.get } @@ -169,25 +253,14 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR if (reader != null && batchLoaded) { val bytesReadStart = reader.bytesRead() batchLoaded = reader.loadNextBatch() - println("ssss") - if (batchLoaded) { val batch = new ColumnarBatch(vectors) val rowCount = root.getRowCount - totalNumberOfRows += rowCount - println("Total number of rows: " + totalNumberOfRows) batch.setNumRows(root.getRowCount) val bytesReadEnd = reader.bytesRead() - // 1_571_296 - // 24_133_432 - // 48 264 788 - // 48 264 720 - // 41076056 - pythonMetrics("pythonNumRowsReceived") += rowCount pythonMetrics("pythonDataReceived") += bytesReadEnd - bytesReadStart - val out = deserializeColumnarBatch(batch, schema) - out + deserializeColumnarBatch(batch, schema) } else { reader.close(false) allocator.close() @@ -195,47 +268,27 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR read() } } else { -// if (stream.available() == 0) { -// println("ssss") -// throw handleException -// return null.asInstanceOf[OUT] -// } - val streamType = try { - stream.readInt() - } catch { - case e: Throwable => - SpecialLengths.END_OF_DATA_SECTION - } - - streamType match { + stream.readInt() match { case SpecialLengths.START_ARROW_STREAM => -// file input stream - if (numberOfReads > 2) { - return null.asInstanceOf[OUT] - } - - val stream2 = new FileInputStream(new File("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/output_batch_data.arrow")) - reader = new ArrowStreamReader(stream2, allocator) + reader = new ArrowStreamReader(stream, allocator) root = reader.getVectorSchemaRoot() schema = ArrowUtils.fromArrowSchema(root.getSchema()) vectors = root.getFieldVectors().asScala.map { vector => new ArrowColumnVector(vector) }.toArray[ColumnVector] + read() -// case SpecialLengths.TIMING_DATA => -// handleTimingData() -// read() -// case SpecialLengths.PYTHON_EXCEPTION_THROWN => -// throw handlePythonException() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() case SpecialLengths.END_OF_DATA_SECTION => handleEndOfDataSection() null.asInstanceOf[OUT] - case _ => - handleEndOfDataSection() - null.asInstanceOf[OUT] } } - } + } catch handleException } } } diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index e638a05f25..fa2c4ef5b0 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -96,6 +96,7 @@ class StrategySuite extends TestBaseScala with Matchers { .format("geoparquet") .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_2") .select("geometry") + .limit(1000) df.cache() df.count() diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index 89afa10986..b63237cae6 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -46,7 +46,7 @@ object ScalarUDF { } SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false) -// SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "org.apache.sedona.python.SedonaPythonWorker") + SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.work") private[spark] lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "") protected lazy val sparkHome: String = {
