[ 
https://issues.apache.org/jira/browse/FLINK-39754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39754:
-----------------------------------
    Labels: pull-request-available  (was: )

> DataOutputSerializer.resize() int overflow causes O(n²) growth near 2 GB
> ------------------------------------------------------------------------
>
>                 Key: FLINK-39754
>                 URL: https://issues.apache.org/jira/browse/FLINK-39754
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.20.4, 2.1.2, 2.2.1
>            Reporter: Saulius Valatka
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> {{DataOutputSerializer.resize(int)}} in {{flink-core}} uses {{int}} 
> arithmetic to compute the new buffer length:
> {code:java}
> int newLen = Math.max(this.buffer.length * 2, this.buffer.length + 
> minCapacityAdd);
> {code}
> Once {{buffer.length > Integer.MAX_VALUE / 2}} (~1.07 GB), {{buffer.length * 
> 2}} overflows to a negative {{int}}. {{Math.max}} then picks {{buffer.length 
> + minCapacityAdd}}, which is typically only a few bytes larger than the 
> current length. From that point on, every {{resize()}} call grows the buffer 
> by {{minCapacityAdd}} bytes instead of doubling, and each call does a full 
> {{System.arraycopy}} of the ~1.5+ GB buffer. The result is an O(n^2^) memcpy 
> loop that on large heaps looks like a silent hang — eventually 
> {{buffer.length + minCapacityAdd}} also overflows and the existing {{catch 
> (NegativeArraySizeException)}} translates it to an {{IOException}}.
> h3. File / method
> [{{flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java}}|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java],
>  method {{resize(int minCapacityAdd)}}.
> The current shape of {{resize()}} has been in place for many years and is 
> present in every released Flink version.
> h3. Reproduction
> Allocate a {{DataOutputSerializer(1024)}} and call {{writeInt(0)}} in a loop 
> with {{-Xmx3g}}. Once {{position}} crosses ~1.07 GB, throughput drops to near 
> zero; each subsequent {{writeInt}} triggers a full-buffer {{arraycopy}}. JFR 
> / async-profiler shows time concentrated in {{System.arraycopy}} inside 
> {{resize()}}.
> Encountered in production while checkpointing an {{IcebergEnumeratorState}} 
> containing ~1.87M {{IcebergSourceSplit}} entries — 
> {{IcebergEnumeratorStateSerializer.serializePendingSplits()}} serializes 
> every split into a single {{DataOutputSerializer}} buffer, which crosses the 
> 1 GB threshold and hangs.
> h3. Expected behavior
> Either grow cleanly up to the JVM max array size ({{Integer.MAX_VALUE - 8}}) 
> and then throw an actionable {{IOException}}, or throw immediately when the 
> requested size would exceed the cap. No silent O(n^2^) hang.
> h3. Proposed fix
> Extract the size computation into a {{@VisibleForTesting}} static helper that 
> uses {{long}} arithmetic, validates against {{Integer.MAX_VALUE - 8}}, and 
> jumps to the cap once doubling would overflow (so serializations that just 
> barely fit under 2 GB still complete). Drop the now-unreachable {{catch 
> (NegativeArraySizeException)}} block. Preserve the existing 
> {{OutOfMemoryError}} retry path (independent of this bug). Add unit tests on 
> the new helper covering: normal doubling, {{minCapacityAdd}}-dominated 
> growth, jump-to-cap on doubling overflow, exact-cap boundary, and 
> {{IOException}} when required size exceeds the cap.
> h3. Impact
> Any Flink job whose state serialization assembles a single record / object > 
> ~1 GB in a {{DataOutputSerializer}} is affected. Most commonly hit by source 
> enumerator state with many splits (Iceberg, file source) and by very large 
> keyed state values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to