weiqingy opened a new issue, #665: URL: https://github.com/apache/flink-agents/issues/665
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description ### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description ## Summary `DurableExecutionManager` records per-key sequence numbers under a checkpoint id in `snapshotLastCompletedSequenceNumbers(...)`, and `notifyCheckpointComplete(...)` later removes that entry while pruning durable state. However, Flink can also *abort* a checkpoint — in which case the framework calls `notifyCheckpointAborted(...)` instead of `notifyCheckpointComplete(...)`. The `DurableExecutionManager` does not currently override `notifyCheckpointAborted`, so the entry recorded for an aborted checkpoint is never removed from `checkpointIdToSeqNums`. When durable execution is enabled, this causes the map to grow over time as checkpoints abort (e.g., due to timeouts, alignment failures, or backend pressure). This is structurally distinct from the null-store case tracked in #645 — that one is already structurally fixed by the symmetric `actionStateStore == null` guard. The aborted-checkpoint leak only manifests when `actionStateStore != null`. ## Root Cause ```java // Records entry on snapshot — works fine for completed checkpoints void snapshotLastCompletedSequenceNumbers(KeyedStateBackend<?> backend, long checkpointId) { if (actionStateStore == null) return; ... checkpointIdToSeqNums.put(checkpointId, keyToSeqNum); } // Removes entry only on successful completion void notifyCheckpointComplete(long checkpointId) { if (actionStateStore != null) { ... checkpointIdToSeqNums.remove(checkpointId); } } // MISSING: no notifyCheckpointAborted override → aborted checkpoints leak their entry ``` In `ActionExecutionOperator`, only `notifyCheckpointComplete` is wired through to the manager; there is no corresponding `notifyCheckpointAborted` handler that forwards the call to the manager. ## Fix (proposed) 1. Add `notifyCheckpointAborted(long checkpointId)` to `DurableExecutionManager` that removes the corresponding entry from `checkpointIdToSeqNums` (no pruning needed — the durable state for an aborted checkpoint was never committed). 2. Override `notifyCheckpointAborted` on `ActionExecutionOperator` and forward to the manager. 3. Add a regression test in `DurableExecutionManagerTest` covering both: - Aborted checkpoint removes its entry. - Completed and aborted checkpoints interleaved leave only the in-flight ones in the map. ### How to reproduce This is a structural code bug — no specific input data is needed. 1. Configure an agent job with durable execution **enabled** (set `ActionStateStore`). 2. Enable Flink checkpointing with conditions that cause some checkpoints to abort (e.g., short timeout, backend pressure). 3. Run the job and observe `checkpointIdToSeqNums` size. **Expected**: only entries for in-flight (not yet completed or aborted) checkpoints remain. **Actual**: entries for aborted checkpoints persist for the lifetime of the job, growing unboundedly under sustained abort conditions. ### Version and environment - **Flink Agents**: 0.3-SNAPSHOT - **Apache Flink**: 2.2.0 - **Java**: 11 - **Deployment mode**: any (bug is in operator logic) ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
