viirya commented on code in PR #50301:
URL: https://github.com/apache/spark/pull/50301#discussion_r2001860740


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala:
##########
@@ -83,17 +89,37 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { 
self: BasePythonRunner[
           throw writer.exception.get
         }
         try {
-          if (reader != null && batchLoaded) {
+          if (batchLoaded && rowCount > 0 && currentRowIdx < rowCount) {
+            val batchRoot = if (arrowMaxRecordsPerOutputBatch > 0) {
+              val remainingRows = rowCount - currentRowIdx
+              if (remainingRows > arrowMaxRecordsPerOutputBatch) {
+                root.slice(currentRowIdx, arrowMaxRecordsPerOutputBatch)
+              } else {
+                root
+              }
+            } else {
+              root
+            }
+
+            currentRowIdx = currentRowIdx + batchRoot.getRowCount
+
+            vectors = batchRoot.getFieldVectors().asScala.map { vector =>
+              new ArrowColumnVector(vector)
+            }.toArray[ColumnVector]
+
+            val batch = new ColumnarBatch(vectors)
+            batch.setNumRows(batchRoot.getRowCount)
+            deserializeColumnarBatch(batch, schema)

Review Comment:
   I wonder why we should slice it at Python side? It looks more natural and 
also easier to slice the output Arrow batches at Scala side. For example, for 
various operations with Arrow outputs, `PythonArrowOutput` is commonly used to 
handle Arrow output from Python worker. We can have one single place to handle 
slicing. From a quick look, Arrow output is handled in Python at various 
different code. To slice the batch at Python requires more changes to Python 
code.
   
   Also, another reason is that I feel more confident to modify the Scala code. 
😄 
   
   Btw, as when serializing pandas Series to Arrow arrays, it does memory 
copying, I'm not sure if this could cause some regressions for some cases. For 
example, a dictionary is copying for every sliced batch?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to