Tengfei Huang created SPARK-57345:
-------------------------------------

             Summary: Bound ArrowConverters batch size by Arrow buffer 
accounting to prevent executor OOM
                 Key: SPARK-57345
                 URL: https://issues.apache.org/jira/browse/SPARK-57345
             Project: Spark
          Issue Type: Task
          Components: SQL
    Affects Versions: 4.1.2
            Reporter: Tengfei Huang


`ArrowConverters.ArrowBatchWithSchemaIterator` decides when to cut an Arrow IPC 
batch using a per-row byte-size estimate: it sums `UnsafeRow.getSizeInBytes` 
for unsafe rows and falls back to `numFields * 16` for generic rows, comparing 
the running total against `maxEstimatedBatchSize` (e.g. 
`spark.connect.grpc.arrow.maxBatchSize`).

This estimate badly under-counts Arrow's serialized output for variable-length 
(string/binary) and nested (struct/array/map) types -- by 10-100x in the worst 
cases -- because the in-memory Arrow buffer footprint is not derivable from the 
input row's byte count. As a result a single batch can absorb far more rows 
than the budget intends and grow well beyond \{{maxEstimatedBatchSize}}.

Two effects compound into executor OOM during serialization:
* The size bound never trips, so batches grow unbounded relative to the budget.
* Each oversized batch is serialized into a `java.io.ByteArrayOutputStream`, 
whose backing array doubles on `ensureCapacity`, peaking at 2-3x the final size 
during a single resize. With many concurrent tasks per executor the aggregate 
heap pressure is multiplied accordingly.

  The OOM surfaces inside `ByteArrayOutputStream.ensureCapacity -> 
Arrays.copyOf`
  while serializing the Arrow IPC batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to