[
https://issues.apache.org/jira/browse/FLINK-39754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39754:
-----------------------------------
Labels: pull-request-available (was: )
> DataOutputSerializer.resize() int overflow causes O(n²) growth near 2 GB
> ------------------------------------------------------------------------
>
> Key: FLINK-39754
> URL: https://issues.apache.org/jira/browse/FLINK-39754
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.20.4, 2.1.2, 2.2.1
> Reporter: Saulius Valatka
> Priority: Major
> Labels: pull-request-available
>
> h3. Summary
> {{DataOutputSerializer.resize(int)}} in {{flink-core}} uses {{int}}
> arithmetic to compute the new buffer length:
> {code:java}
> int newLen = Math.max(this.buffer.length * 2, this.buffer.length +
> minCapacityAdd);
> {code}
> Once {{buffer.length > Integer.MAX_VALUE / 2}} (~1.07 GB), {{buffer.length *
> 2}} overflows to a negative {{int}}. {{Math.max}} then picks {{buffer.length
> + minCapacityAdd}}, which is typically only a few bytes larger than the
> current length. From that point on, every {{resize()}} call grows the buffer
> by {{minCapacityAdd}} bytes instead of doubling, and each call does a full
> {{System.arraycopy}} of the ~1.5+ GB buffer. The result is an O(n^2^) memcpy
> loop that on large heaps looks like a silent hang — eventually
> {{buffer.length + minCapacityAdd}} also overflows and the existing {{catch
> (NegativeArraySizeException)}} translates it to an {{IOException}}.
> h3. File / method
> [{{flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java}}|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java],
> method {{resize(int minCapacityAdd)}}.
> The current shape of {{resize()}} has been in place for many years and is
> present in every released Flink version.
> h3. Reproduction
> Allocate a {{DataOutputSerializer(1024)}} and call {{writeInt(0)}} in a loop
> with {{-Xmx3g}}. Once {{position}} crosses ~1.07 GB, throughput drops to near
> zero; each subsequent {{writeInt}} triggers a full-buffer {{arraycopy}}. JFR
> / async-profiler shows time concentrated in {{System.arraycopy}} inside
> {{resize()}}.
> Encountered in production while checkpointing an {{IcebergEnumeratorState}}
> containing ~1.87M {{IcebergSourceSplit}} entries —
> {{IcebergEnumeratorStateSerializer.serializePendingSplits()}} serializes
> every split into a single {{DataOutputSerializer}} buffer, which crosses the
> 1 GB threshold and hangs.
> h3. Expected behavior
> Either grow cleanly up to the JVM max array size ({{Integer.MAX_VALUE - 8}})
> and then throw an actionable {{IOException}}, or throw immediately when the
> requested size would exceed the cap. No silent O(n^2^) hang.
> h3. Proposed fix
> Extract the size computation into a {{@VisibleForTesting}} static helper that
> uses {{long}} arithmetic, validates against {{Integer.MAX_VALUE - 8}}, and
> jumps to the cap once doubling would overflow (so serializations that just
> barely fit under 2 GB still complete). Drop the now-unreachable {{catch
> (NegativeArraySizeException)}} block. Preserve the existing
> {{OutOfMemoryError}} retry path (independent of this bug). Add unit tests on
> the new helper covering: normal doubling, {{minCapacityAdd}}-dominated
> growth, jump-to-cap on doubling overflow, exact-cap boundary, and
> {{IOException}} when required size exceeds the cap.
> h3. Impact
> Any Flink job whose state serialization assembles a single record / object >
> ~1 GB in a {{DataOutputSerializer}} is affected. Most commonly hit by source
> enumerator state with many splits (Iceberg, file source) and by very large
> keyed state values.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)