This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. The notification for complete checkpoints is not reliable; it may be late, not come at all, possibly even in different order than expected.

As such, if you a simple case of snapshot -> snapshot -> notify -> notify the sink will always fail with an exception.

What it should do imo is either a) don't check that there is a pending transaction or b) track the highest checkpoint id received and optionally don't fail if the notification is for an older CP.

@piotr WDYT?

On 27/11/2019 08:59, Tony Wei wrote:
Hi,

As the follow up, it seem that savepoint can't be subsumed, so that its notification could still be send to each TMs.
Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於 2019年11月27日 週三 下午3:43寫道:

    Hi,

    I want to raise this question again, since I have had this
    exception on my production job.

    The exception is as follows

        2019-11-27 14:47:29

        java.lang.RuntimeException: Error while confirming checkpoint
            at
        org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
            at
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748) Caused by:
        java.lang.IllegalStateException: checkpoint completed, but no
        transaction pending     at
        org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
            at
        
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
            at
        
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
            at
        
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
            at
        org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
            ... 5 more


    And these are the checkpoint / savepoint before the job failed.
    checkoint.png

    It seems that checkpoint # 675's notification handled the
    savepoint # 674's pending transaction holder, but savepoint #674's
    notification didn't be subsumed or be ignored by JM.
    Therefore, during the checkpoint #676, some tasks got notification
    before getting the checkpoint barrier and led to this exception
    happened, because there was no pending transaction in queue.

    Does anyone know the details about subsumed notifications
    mechanism and how checkpoint coordinator handle this situation?
    Please correct me if I'm wrong. Thanks.

    Best,
    Tony Wei

    Stefan Richter <s.rich...@data-artisans.com
    <mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一
    下午5:03寫道:

        Hi Pedro,

        unfortunately the interesting parts are all removed from the
        log, we already know about the exception itself. In
        particular, what I would like to see is what checkpoints have
        been triggered and completed before the exception happens.

        Best,
        Stefan

        > Am 08.10.2018 um 10:23 schrieb PedroMrChaves
        <pedro.mr.cha...@gmail.com <mailto:pedro.mr.cha...@gmail.com>>:
        >
        > Hello,
        >
        > Find attached the jobmanager.log. I've omitted the log lines
        from other
        > runs, only left the job manager info and the run with the
        error.
        >
        > jobmanager.log
        >
        
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>

        >
        >
        >
        > Thanks again for your help.
        >
        > Regards,
        > Pedro.
        >
        >
        >
        > -----
        > Best Regards,
        > Pedro Chaves
        > --
        > Sent from:
        http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to