[ https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexis Sarda-Espinosa updated FLINK-34325: ------------------------------------------ Summary: Inconsistent state with data loss after OutOfMemoryError (was: Inconsistent state with data loss after OutOfMemoryError in Job Manager) > Inconsistent state with data loss after OutOfMemoryError > -------------------------------------------------------- > > Key: FLINK-34325 > URL: https://issues.apache.org/jira/browse/FLINK-34325 > Project: Flink > Issue Type: Bug > Affects Versions: 1.17.1 > Environment: Flink on Kubernetes with HA, RocksDB with incremental > checkpoints on Azure > Reporter: Alexis Sarda-Espinosa > Priority: Major > Attachments: jobmanager_log.txt > > > I have a job that uses broadcast state to maintain a cache of required > metadata. I am currently evaluating memory requirements of my specific use > case, and I ran into a weird situation that seems worrisome. > All sources in my job are Kafka sources. I wrote a large amount of messages > in Kafka to force the broadcast state's cache to grow. At some point, this > caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job > Manager. I would have expected the whole java process of the JM to crash, but > the job was simply restarted. What's worrisome is that, after 2 restarts, the > job resumed from the latest successful checkpoint and completely ignored all > the events I wrote to Kafka, which I can verify because I have a custom > metric that exposes the approximate size of this cache, and the fact that the > job didn't crashloop at this point after reading all the messages from Kafka > over and over again. > I'm attaching an excerpt of the Job Manager's logs. My main concerns are: > # It seems the memory error from the JM didn't prevent the Kafka offsets from > being "rolled back", so eventually the Kafka events that should have ended in > the broadcast state's cache were ignored. > # Is it normal that the state is somehow "materialized" in the JM and is thus > affected by the size of the JM's heap? Is this something particular due to > the use of broadcast state? I found this very surprising. -- This message was sent by Atlassian Jira (v8.20.10#820010)