Hi The stack said that the job failed when restoring from checkpoint/savepoint. If encounter this when in failover, maybe you can try to find out the root cause which caused the job failover. For the stack, it because when restoring `HeapPriorityQueue`, there would ensure there are enough size by resizeQueueArray[1](use Arrays.copy), maybe this is the problem, could you please take heap dump when exit with OOM?
[1] https://github.com/apache/flink/blob/5e0b7970a9aea74aba4ebffaa75c37e960799b93/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java#L151 Best, Congxian Robert Metzger <rmetz...@apache.org> 于2020年8月27日周四 下午10:59写道: > Hi Vishwas, > > Your scenario sounds like RocksDB would actually be recommended. I would > always suggest to start with RocksDB, unless your state is really small > compared to the available memory, or you need to optimize for performance. > But maybe your job is running fine with RocksDB (performance wise), then > there's no need to go into the details of heap memory management with Flink. > > > > On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara <vsirav...@gmail.com> > wrote: > >> Thanks Andrey, >> My question is related to >> >> The FsStateBackend is encouraged for: >> >> - Jobs with large state, long windows, large key/value states. >> - All high-availability setups. >> >> How large is large state without any overhead added by the framework? >> >> Best, >> Vishwas >> >> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin <azagre...@apache.org> >> wrote: >> >>> Hi Vishwas, >>> >>> is this quantifiable with respect to JVM heap size on a single node >>>> without the node being used for other tasks ? >>> >>> >>> I don't quite understand this question. I believe the recommendation in >>> docs has the same reason: use larger state objects so that the Java object >>> overhead pays off. >>> RocksDB keeps state in memory and on disk in the serialized form. >>> Therefore it usually has a smaller footprint. >>> Other jobs in the same task manager can potentially use other state >>> backend depending on their state requirements. >>> All tasks in the same task manager share the JVM heap as the task >>> manager runs one JVM system process on the machine where it is deployed to. >>> >>> Best, >>> Andrey >>> >>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara <vsirav...@gmail.com> >>> wrote: >>> >>>> Hi Andrey, >>>> Thanks for getting back to me so quickly. The screenshots are for 1GB >>>> heap, the keys for the state are 20 character strings(20 bytes, we don't >>>> have multi byte characters) . So the overhead seems to be quite large(4x) >>>> even in comparison to the checkpoint size(which already adds an overhead) . >>>> In this document [1] it says use FS/Heap backend for large states, is this >>>> quantifiable with respect to JVM heap size on a single node without the >>>> node being used for other tasks ? >>>> I have attached GC log for TM and JM >>>> >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend >>>> >>>> Best, >>>> Vishwas >>>> >>>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin <azagre...@apache.org> >>>> wrote: >>>> >>>>> Hi Vishwas, >>>>> >>>>> I believe the screenshots are from a heap size of 1GB? >>>>> >>>>> There are indeed many internal Flink state objects. They are overhead >>>>> which is required for Flink to organise and track the state on-heap. >>>>> Depending on the actual size of your state objects, the overhead may >>>>> be relatively large or compared to the actual state size. >>>>> For example, if you just keep integers in your state then overhead is >>>>> probably a couple of times larger. >>>>> It is not easy to estimate exactly on-heap size without through >>>>> analysis. >>>>> >>>>> The checkpoint has little overhead and includes only actual state data >>>>> - your serialized state objects which are probably smaller than their heap >>>>> representation. >>>>> >>>>> So my guess is that the heap representation of the state is much >>>>> bigger compared to the checkpoint size. >>>>> >>>>> I also cc other people who might add more thoughts about on-heap state >>>>> size. >>>>> >>>>> You could also provide GC logs as Xintong suggested. >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <vsirav...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( >>>>>> checkpoint size). I took a heap dump and I could not find any memory leak >>>>>> from user code. I see the similar behaviour on smaller heap size, on a >>>>>> 1GB >>>>>> heap , the state size from checkpoint UI is 180 MB. Attaching some >>>>>> screenshots of heap profiles if it helps. So when the state grows GC >>>>>> takes >>>>>> a long time and sometimes the job manager removes TM slot because of >>>>>> 10000ms timeout and tries to restore the task in another task manager, >>>>>> this >>>>>> creates a cascading effect and affects other jobs running on the cluster. >>>>>> My tests were run in a single node cluster with 1 TM and 4 task slots >>>>>> with >>>>>> a parallelism of 4. >>>>>> >>>>>> Best, >>>>>> Vishwas >>>>>> >>>>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin < >>>>>> azagre...@apache.org> wrote: >>>>>> >>>>>>> Hi Vishwas, >>>>>>> >>>>>>> If you use Flink 1.7, check the older memory model docs [1] because >>>>>>> you referred to the new memory model of Flink 1.10 in your reference 2. >>>>>>> Could you also share a screenshot where you get the state size of >>>>>>> 2.5 GB? Do you mean Flink WebUI? >>>>>>> Generally, it is quite hard to estimate the on-heap size of state >>>>>>> java objects. I never heard about such a Flink metric. >>>>>>> >>>>>>> Best, >>>>>>> Andrey >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html >>>>>>> >>>>>>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Vishwas, >>>>>>>> >>>>>>>> According to the log, heap space is 13+GB, which looks fine. >>>>>>>> >>>>>>>> Several reason might lead to the heap space OOM: >>>>>>>> >>>>>>>> - Memory leak >>>>>>>> - Not enough GC threads >>>>>>>> - Concurrent GC starts too late >>>>>>>> - ... >>>>>>>> >>>>>>>> I would suggest taking a look at the GC logs. >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara < >>>>>>>> vsirav...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi guys, >>>>>>>>> I use flink version 1.7.2 >>>>>>>>> I have a stateful streaming job which uses a keyed process >>>>>>>>> function. I use heap state backend. Although I set TM heap size to 16 >>>>>>>>> GB, I >>>>>>>>> get OOM error when the state size is around 2.5 GB(from dashboard I >>>>>>>>> get the >>>>>>>>> state size). I have set taskmanager.memory.fraction: 0.01 (which I >>>>>>>>> believe >>>>>>>>> is for native calls off heap). [1] . For an 8 GB TM heap setting , >>>>>>>>> the OOM >>>>>>>>> errors start showing up when the state size reaches 1 GB. This I find >>>>>>>>> puzzling because I would expect to get a lot more space on the heap >>>>>>>>> for >>>>>>>>> state when I change the size to 16 GB, what fraction of the heap is >>>>>>>>> used by >>>>>>>>> the framework ?[2]. Below is the stack trace for the exception. How >>>>>>>>> can I >>>>>>>>> increase my state size on the heap ? >>>>>>>>> >>>>>>>>> 2020-08-21 02:05:54,443 INFO >>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - >>>>>>>>> Memory >>>>>>>>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB >>>>>>>>> (used/committed/max)] >>>>>>>>> 2020-08-21 02:05:54,444 INFO >>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - >>>>>>>>> Direct >>>>>>>>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: >>>>>>>>> 1074692521 >>>>>>>>> 2020-08-21 02:05:54,444 INFO >>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - >>>>>>>>> Off-heap >>>>>>>>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], >>>>>>>>> [Metaspace: >>>>>>>>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 >>>>>>>>> MB >>>>>>>>> (used/committed/max)] >>>>>>>>> 2020-08-21 02:05:54,444 INFO >>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - >>>>>>>>> Garbage >>>>>>>>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], >>>>>>>>> [PS >>>>>>>>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] >>>>>>>>> 2020-08-21 02:05:54,446 INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - >>>>>>>>> KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from >>>>>>>>> RUNNING >>>>>>>>> to FAILED. >>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>> at java.lang.reflect.Array.newInstance(Array.java:75) >>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:3212) >>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:3181) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown >>>>>>>>> Source) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Vishwas >>>>>>>>> >>>>>>>>