viirya commented on code in PR #50301: URL: https://github.com/apache/spark/pull/50301#discussion_r2029712154
########## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala: ########## @@ -83,17 +89,37 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[ throw writer.exception.get } try { - if (reader != null && batchLoaded) { + if (batchLoaded && rowCount > 0 && currentRowIdx < rowCount) { + val batchRoot = if (arrowMaxRecordsPerOutputBatch > 0) { + val remainingRows = rowCount - currentRowIdx + if (remainingRows > arrowMaxRecordsPerOutputBatch) { + root.slice(currentRowIdx, arrowMaxRecordsPerOutputBatch) + } else { + root + } + } else { + root + } + + currentRowIdx = currentRowIdx + batchRoot.getRowCount + + vectors = batchRoot.getFieldVectors().asScala.map { vector => + new ArrowColumnVector(vector) + }.toArray[ColumnVector] + + val batch = new ColumnarBatch(vectors) + batch.setNumRows(batchRoot.getRowCount) + deserializeColumnarBatch(batch, schema) Review Comment: Yea, I agree that slicing by bytes is more preferred. This slicing by rows is simple to have, though. Also I wonder if slicing by bytes may introduce a little bit more overhead. Although Java Arrow allows us to slice arrays without copying, it is index/row based slicing. As I mentioned above, we have to have a loop continuing trying different rows until a satisfying byte size (because we cannot tell the API to give us a slice with required size of bytes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org