Piotr Nowojski created FLINK-17351:
--------------------------------------

             Summary: CheckpointCoordinator and CheckpointFailureManager 
ignores checkpoint timeouts
                 Key: FLINK-17351
                 URL: https://issues.apache.org/jira/browse/FLINK-17351
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.10.0, 1.9.2
            Reporter: Piotr Nowojski


As described in point 2: 
https://issues.apache.org/jira/browse/FLINK-17327?focusedCommentId=17090576&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17090576

(copy of description from above linked comment):

The logic in how {{CheckpointCoordinator}} handles checkpoint timeouts is 
broken. In your [~qinjunjerry] examples, your job should have failed after 
first checkpoint failure, but checkpoints were time outing on 
CheckpointCoordinator after 5 seconds, before {{FlinkKafkaProducer}} was 
detecting Kafka failure after 2 minutes. Those timeouts were not checked 
against {{setTolerableCheckpointFailureNumber(...)}} limit, so the job was keep 
going with many timed out checkpoints. Now funny thing happens: 
FlinkKafkaProducer detects Kafka failure. Funny thing is that it depends where 
the failure was detected:

a) on processing record? no problem, job will failover immediately once failure 
is detected (in this example after 2 minutes)
b) on checkpoint? heh, the failure is reported to {{CheckpointCoordinator}} 
*and gets ignored, as PendingCheckpoint has already been discarded 2 minutes 
ago* :) So theoretically the checkpoints can keep failing forever and the job 
will not restart automatically, unless something else fails.

Even more funny things can happen if we mix FLINK-17350 . or b) with 
intermittent external system failure. Sink reports an exception, transaction 
was lost/aborted, Sink is in failed state, but if there will be a happy 
coincidence that it manages to accept further records, this exception can be 
lost and all of the records in those failed checkpoints will be lost forever as 
well. In all of the examples that [~qinjunjerry] posted it hasn't happened. 
{{FlinkKafkaProducer}} was not able to recover after the initial failure and it 
was keep throwing exceptions until the job finally failed (but much later then 
it should have). And that's not guaranteed anywhere.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to