HyukjinKwon commented on code in PR #50301: URL: https://github.com/apache/spark/pull/50301#discussion_r2009372990
########## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala: ########## @@ -83,17 +89,37 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[ throw writer.exception.get } try { - if (reader != null && batchLoaded) { + if (batchLoaded && rowCount > 0 && currentRowIdx < rowCount) { + val batchRoot = if (arrowMaxRecordsPerOutputBatch > 0) { + val remainingRows = rowCount - currentRowIdx + if (remainingRows > arrowMaxRecordsPerOutputBatch) { + root.slice(currentRowIdx, arrowMaxRecordsPerOutputBatch) + } else { + root + } + } else { + root + } + + currentRowIdx = currentRowIdx + batchRoot.getRowCount + + vectors = batchRoot.getFieldVectors().asScala.map { vector => + new ArrowColumnVector(vector) + }.toArray[ColumnVector] + + val batch = new ColumnarBatch(vectors) + batch.setNumRows(batchRoot.getRowCount) + deserializeColumnarBatch(batch, schema) Review Comment: I actually took a look around this few times before but I ended up with giving up .. > I wonder why we should slice it at Python side? It looks more natural and also easier to slice the output Arrow batches at Scala side. Because the (sliced) input batches are created in JVM, and sent to Python - so I thought the other way should be (sliced) output batches should be created in Python (in `serializers.py`), and sent to JVM. So, suppose that we set input size as 2K records, and output size as 1K records. With the current changes, it will be as below: ``` JVM (sliced to 2K) -- 2K --> Python -- 2K --> JVM (sliced to 1K) ``` I would expect the Python process to output 1K records when I set this, e.g., to avoid OOM, more streaming manner to deal with the output, etc. as below: ``` JVM (sliced to 2K) -- 2K --> Python (sliced to 1K) -- 1K --> JVM ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org