Saulius Valatka created FLINK-39754:
---------------------------------------

             Summary: DataOutputSerializer.resize() int overflow causes O(n²) 
growth near 2 GB
                 Key: FLINK-39754
                 URL: https://issues.apache.org/jira/browse/FLINK-39754
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 2.2.1, 2.1.2, 1.20.4
            Reporter: Saulius Valatka


h3. Summary

{{DataOutputSerializer.resize(int)}} in {{flink-core}} uses {{int}} arithmetic 
to compute the new buffer length:

{code:java}
int newLen = Math.max(this.buffer.length * 2, this.buffer.length + 
minCapacityAdd);
{code}

Once {{buffer.length > Integer.MAX_VALUE / 2}} (~1.07 GB), {{buffer.length * 
2}} overflows to a negative {{int}}. {{Math.max}} then picks {{buffer.length + 
minCapacityAdd}}, which is typically only a few bytes larger than the current 
length. From that point on, every {{resize()}} call grows the buffer by 
{{minCapacityAdd}} bytes instead of doubling, and each call does a full 
{{System.arraycopy}} of the ~1.5+ GB buffer. The result is an O(n^2^) memcpy 
loop that on large heaps looks like a silent hang — eventually {{buffer.length 
+ minCapacityAdd}} also overflows and the existing {{catch 
(NegativeArraySizeException)}} translates it to an {{IOException}}.

h3. File / method
[{{flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java}}|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java],
 method {{resize(int minCapacityAdd)}}.

The current shape of {{resize()}} has been in place for many years and is 
present in every released Flink version.

h3. Reproduction

Allocate a {{DataOutputSerializer(1024)}} and call {{writeInt(0)}} in a loop 
with {{-Xmx3g}}. Once {{position}} crosses ~1.07 GB, throughput drops to near 
zero; each subsequent {{writeInt}} triggers a full-buffer {{arraycopy}}. JFR / 
async-profiler shows time concentrated in {{System.arraycopy}} inside 
{{resize()}}.

Encountered in production while checkpointing an {{IcebergEnumeratorState}} 
containing ~1.87M {{IcebergSourceSplit}} entries — 
{{IcebergEnumeratorStateSerializer.serializePendingSplits()}} serializes every 
split into a single {{DataOutputSerializer}} buffer, which crosses the 1 GB 
threshold and hangs.

h3. Expected behavior

Either grow cleanly up to the JVM max array size ({{Integer.MAX_VALUE - 8}}) 
and then throw an actionable {{IOException}}, or throw immediately when the 
requested size would exceed the cap. No silent O(n^2^) hang.

h3. Proposed fix

Extract the size computation into a {{@VisibleForTesting}} static helper that 
uses {{long}} arithmetic, validates against {{Integer.MAX_VALUE - 8}}, and 
jumps to the cap once doubling would overflow (so serializations that just 
barely fit under 2 GB still complete). Drop the now-unreachable {{catch 
(NegativeArraySizeException)}} block. Preserve the existing 
{{OutOfMemoryError}} retry path (independent of this bug). Add unit tests on 
the new helper covering: normal doubling, {{minCapacityAdd}}-dominated growth, 
jump-to-cap on doubling overflow, exact-cap boundary, and {{IOException}} when 
required size exceeds the cap.

h3. Impact

Any Flink job whose state serialization assembles a single record / object > ~1 
GB in a {{DataOutputSerializer}} is affected. Most commonly hit by source 
enumerator state with many splits (Iceberg, file source) and by very large 
keyed state values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to