[ https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962091#comment-16962091 ]
Biao Liu commented on FLINK-13861: ---------------------------------- Thanks for reporting [~klion26]! It's definitely a critical issue. I agree with [~pnowojski] that we should not handle this kind of exception. There might be a resource leaking or other unexpected scenario (like skipping the {{triggerQueuedRequests}}) if we tolerate the exception. A big try catch wrapping this canceller might be a good choice. Any statement of the cancellation should not cause an exception here, not only the {{failPendingCheckpoint}}. I'm also interested in the root cause of this issue. I would check the relevant codes later to search for clues. Regarding to the potential conflict, I could take care of that :) So please do the fixing if you would like to, I could help reviewing if you need. > No new checkpoint will be trigged when canceling an expired checkpoint failed > ----------------------------------------------------------------------------- > > Key: FLINK-13861 > URL: https://issues.apache.org/jira/browse/FLINK-13861 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.7.2, 1.8.1, 1.9.0 > Reporter: Congxian Qiu(klion26) > Priority: Major > Fix For: 1.10.0 > > > I encountered this problem in our private fork of Flink, after taking a look > at the current master branch of Apache Flink, I think the problem exists here > also. > Problem Detail: > 1. checkpoint canceled because of expiration, so will call the canceller > such as below > {code:java} > final Runnable canceller = () -> { > synchronized (lock) { > // only do the work if the checkpoint is not discarded anyways > // note that checkpoint completion discards the pending checkpoint > object > if (!checkpoint.isDiscarded()) { > LOG.info("Checkpoint {} of job {} expired before completing.", > checkpointID, job); > failPendingCheckpoint(checkpoint, > CheckpointFailureReason.CHECKPOINT_EXPIRED); > pendingCheckpoints.remove(checkpointID); > rememberRecentCheckpointId(checkpointID); > triggerQueuedRequests(); > } > } > };{code} > > But failPendingCheckpoint may throw exceptions because it will call > {{CheckpointCoordinator#failPendingCheckpoint}} > -> {{PendingCheckpoint#abort}} > -> {{PendingCheckpoint#reportFailedCheckpoint}} > -> initialize a FailedCheckpointStates, may throw an exception by > {{checkArgument}} > Did not find more about why there ever failed the {{checkArgument > currently(this problem did not reproduce frequently)}}, will create an issue > for that if I have more findings. > > 2. when trigger checkpoint next, we'll first check if there already are too > many checkpoints such as below > {code:java} > private void checkConcurrentCheckpoints() throws CheckpointException { > if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { > triggerRequestQueued = true; > if (currentPeriodicTrigger != null) { > currentPeriodicTrigger.cancel(false); > currentPeriodicTrigger = null; > } > throw new > CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); > } > } > {code} > the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will > always true > 3. no checkpoint will be triggered ever from that on. > Because of the {{failPendingCheckpoint}} may throw Exception, so we may > place the remove pending checkpoint logic in a finally block. > I'd like to file a pr for this if this really needs to fix. -- This message was sent by Atlassian Jira (v8.3.4#803005)