This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a54fff253ca3 [SPARK-55802][SQL] Fix integer overflow when computing 
Arrow batch bytes
a54fff253ca3 is described below

commit a54fff253ca3aa8da282f3970839a693a7b2bb89
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Mar 4 08:21:38 2026 -0800

    [SPARK-55802][SQL] Fix integer overflow when computing Arrow batch bytes
    
    ### What changes were proposed in this pull request?
    
    `ArrowWriter.sizeInBytes()` and `SliceBytesArrowOutputProcessorImpl 
.getBatchBytes()` both accumulated per-column buffer sizes (each an `Int`) into 
an `Int` accumulator. When the total exceeds 2 GB the sum silently wraps 
negative, causing the byte-limit checks controlled by 
`spark.sql.execution.arrow.maxBytesPerBatch` and
    `spark.sql.execution.arrow.maxBytesPerOutputBatch` to behave incorrectly 
and potentially allow oversized batches through.
    
    Fix by changing both accumulators and return types to `Long`.
    
    ### Why are the changes needed?
    
    Fix possible overflow when calculating Arrow batch bytes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Sonnet 4.6 <noreplyanthropic.com>
    
    Closes #54584 from viirya/fix-arrow-batch-bytes-overflow.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
    (cherry picked from commit df195ac59de5bd896cd70699cfe96ebf78bf2976)
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 4 ++--
 .../org/apache/spark/sql/execution/python/PythonArrowOutput.scala     | 4 ++--
 .../execution/python/streaming/BaseStreamingArrowWriterSuite.scala    | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index 8d68e74dbf87..b5269da035f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -124,9 +124,9 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: 
Array[ArrowFieldWriter]) {
     count += 1
   }
 
-  def sizeInBytes(): Int = {
+  def sizeInBytes(): Long = {
     var i = 0
-    var bytes = 0
+    var bytes = 0L
     while (i < fields.size) {
       bytes += fields(i).getSizeInBytes()
       i += 1
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index df458fa37d7f..1e8f4ebfd1fe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -288,8 +288,8 @@ class SliceBytesArrowOutputProcessorImpl(
     }
   }
 
-  private def getBatchBytes(root: VectorSchemaRoot): Int = {
-    var batchBytes = 0
+  private def getBatchBytes(root: VectorSchemaRoot): Long = {
+    var batchBytes = 0L
     root.getFieldVectors.asScala.foreach { vector =>
       batchBytes += vector.getBufferSize
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
index fc10a102b4f5..4b44a923fd14 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
@@ -76,7 +76,7 @@ class BaseStreamingArrowWriterSuite extends SparkFunSuite 
with BeforeAndAfterEac
       ()
     }
 
-    when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter }
+    when(arrowWriter.sizeInBytes()).thenAnswer { _ => sizeCounter.toLong }
 
     // Set arrowMaxBytesPerBatch to 1
     transformWithStateInPySparkWriter = new BaseStreamingArrowWriter(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to