Hi Vishwas,

If you use Flink 1.7, check the older memory model docs [1] because you
referred to the new memory model of Flink 1.10 in your reference 2.
Could you also share a screenshot where you get the state size of 2.5 GB?
Do you mean Flink WebUI?
Generally, it is quite hard to estimate the on-heap size of state java
objects. I never heard about such a Flink metric.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com> wrote:

> Hi Vishwas,
>
> According to the log, heap space is 13+GB, which looks fine.
>
> Several reason might lead to the heap space OOM:
>
>    - Memory leak
>    - Not enough GC threads
>    - Concurrent GC starts too late
>    - ...
>
> I would suggest taking a look at the GC logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <vsirav...@gmail.com>
> wrote:
>
>> Hi guys,
>> I use flink version 1.7.2
>> I have a stateful streaming job which uses a keyed process function. I
>> use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>> error when the state size is around 2.5 GB(from dashboard I get the state
>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for
>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors
>> start showing up when the state size reaches 1 GB. This I find puzzling
>> because I would expect to get a lot more space on the heap for state when I
>> change the size to 16 GB, what fraction of the heap is used by the
>> framework ?[2]. Below is the stack trace for the exception. How can I
>> increase my state size on the heap ?
>>
>> 2020-08-21 02:05:54,443 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Memory
>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>> (used/committed/max)]
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Direct
>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>> 1074692521
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Off-heap
>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
>> (used/committed/max)]
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Garbage
>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>> 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
>> switched from RUNNING to FAILED.
>> java.lang.OutOfMemoryError: Java heap space
>>         at java.lang.reflect.Array.newInstance(Array.java:75)
>>         at java.util.Arrays.copyOf(Arrays.java:3212)
>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>         at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>>         at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>>         at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>>         at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>         at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>>         at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>>         at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>> Source)
>>         at
>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>>         at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>         at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>         at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>         at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>>
>> Best,
>> Vishwas
>>
>

Reply via email to