Tengfei Huang created SPARK-57345:
-------------------------------------
Summary: Bound ArrowConverters batch size by Arrow buffer
accounting to prevent executor OOM
Key: SPARK-57345
URL: https://issues.apache.org/jira/browse/SPARK-57345
Project: Spark
Issue Type: Task
Components: SQL
Affects Versions: 4.1.2
Reporter: Tengfei Huang
`ArrowConverters.ArrowBatchWithSchemaIterator` decides when to cut an Arrow IPC
batch using a per-row byte-size estimate: it sums `UnsafeRow.getSizeInBytes`
for unsafe rows and falls back to `numFields * 16` for generic rows, comparing
the running total against `maxEstimatedBatchSize` (e.g.
`spark.connect.grpc.arrow.maxBatchSize`).
This estimate badly under-counts Arrow's serialized output for variable-length
(string/binary) and nested (struct/array/map) types -- by 10-100x in the worst
cases -- because the in-memory Arrow buffer footprint is not derivable from the
input row's byte count. As a result a single batch can absorb far more rows
than the budget intends and grow well beyond \{{maxEstimatedBatchSize}}.
Two effects compound into executor OOM during serialization:
* The size bound never trips, so batches grow unbounded relative to the budget.
* Each oversized batch is serialized into a `java.io.ByteArrayOutputStream`,
whose backing array doubles on `ensureCapacity`, peaking at 2-3x the final size
during a single resize. With many concurrent tasks per executor the aggregate
heap pressure is multiplied accordingly.
The OOM surfaces inside `ByteArrayOutputStream.ensureCapacity ->
Arrays.copyOf`
while serializing the Arrow IPC batch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]