Thank you all for investigation/reporting/discussion. I have merged an older PR [1] that was fixing this issue which was previously rejected as we didn’t realise this is a production issue.
I have merged it and issue should be fixed in Flink 1.10, 1.9.2 and 1.8.3 releases. Piotrek [1] https://github.com/apache/flink/pull/6723 <https://github.com/apache/flink/pull/6723> > On 28 Nov 2019, at 02:52, Tony Wei <tony19920...@gmail.com> wrote: > > Hi Piotrek, > > There was already an issue [1] and PR for this thread. Should we mark it as > duplicated or related issue? > > Best, > Tony Wei > > [1] https://issues.apache.org/jira/browse/FLINK-10377 > <https://issues.apache.org/jira/browse/FLINK-10377> > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 > 2019年11月28日 週四 上午12:17寫道: > Hi Tony, > > Thanks for the explanation. Assuming that’s what’s happening, then I agree, > this checkStyle should be removed. I created a ticket for this issue > https://issues.apache.org/jira/browse/FLINK-14979 > <https://issues.apache.org/jira/browse/FLINK-14979> > > Piotrek > >> On 27 Nov 2019, at 16:28, Tony Wei <tony19920...@gmail.com >> <mailto:tony19920...@gmail.com>> wrote: >> >> Hi Piotrek, >> >> The case here was that the first snapshot is a savepoint. I know that if the >> following checkpoint succeeded before the previous one, the previous one >> will be subsumed by JobManager. However, if that previous one is a >> savepoint, it won't be subsumed. That leads to the case that Chesney said. >> The following checkpoint succeeded before the previous savepoint, handling >> both of their pending transaction, but savepoint still succeeded and sent >> the notification to each TaskManager. That led to this exception. Could you >> double check if this is the case? Thank you. >> >> Best, >> Tony Wei >> >> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 >> 2019年11月27日 週三 下午8:50 寫道: >> Hi, >> >> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based >> on Pravega’s sink for Flink, which was implemented by Stephan, and it has >> the same logic [1]. If I remember the discussions with Stephan/Till, the way >> how Flink is using Akka probably guarantees that messages will be always >> delivered, except of some failure, so `notifyCheckpointComplete` could be >> missed probably only if a failure happens between snapshot and arrival of >> the notification. Receiving the same notification twice should be impossible >> (based on the knowledge passed to me from Till/Stephan). >> >> However, for one thing, if that’s possible, then the code should adjusted >> accordingly. On the other hand, maybe there is no harm in relaxing the >> contract? Even if we miss this notification (because of some re-ordering?), >> next one will subsume the missed one and commit everything. >> >> Piotrek >> >> [1] >> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567 >> >> <https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567> >> >>> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org >>> <mailto:ches...@apache.org>> wrote: >>> >>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. >>> The notification for complete checkpoints is not reliable; it may be late, >>> not come at all, possibly even in different order than expected. >>> >>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify >>> the sink will always fail with an exception. >>> >>> What it should do imo is either a) don't check that there is a pending >>> transaction or b) track the highest checkpoint id received and optionally >>> don't fail if the notification is for an older CP. >>> >>> @piotr WDYT? >>> >>> On 27/11/2019 08:59, Tony Wei wrote: >>>> Hi, >>>> >>>> As the follow up, it seem that savepoint can't be subsumed, so that its >>>> notification could still be send to each TMs. >>>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction? >>>> >>>> Best, >>>> Tony Wei >>>> >>>> Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於 >>>> 2019年11月27日 週三 下午3:43寫道: >>>> Hi, >>>> >>>> I want to raise this question again, since I have had this exception on my >>>> production job. >>>> >>>> The exception is as follows >>>> >>>> 2019-11-27 14:47:29 >>>> >>>> java.lang.RuntimeException: Error while confirming checkpoint >>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no >>>> transaction pending >>>> at >>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>>> at >>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) >>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) >>>> ... 5 more >>>> >>>> And these are the checkpoint / savepoint before the job failed. >>>> <checkoint.png> >>>> >>>> It seems that checkpoint # 675's notification handled the savepoint # >>>> 674's pending transaction holder, but savepoint #674's notification didn't >>>> be subsumed or be ignored by JM. >>>> Therefore, during the checkpoint #676, some tasks got notification before >>>> getting the checkpoint barrier and led to this exception happened, because >>>> there was no pending transaction in queue. >>>> >>>> Does anyone know the details about subsumed notifications mechanism and >>>> how checkpoint coordinator handle this situation? Please correct me if I'm >>>> wrong. Thanks. >>>> >>>> Best, >>>> Tony Wei >>>> >>>> Stefan Richter <s.rich...@data-artisans.com >>>> <mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一 下午5:03寫道: >>>> Hi Pedro, >>>> >>>> unfortunately the interesting parts are all removed from the log, we >>>> already know about the exception itself. In particular, what I would like >>>> to see is what checkpoints have been triggered and completed before the >>>> exception happens. >>>> >>>> Best, >>>> Stefan >>>> >>>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <pedro.mr.cha...@gmail.com >>>> > <mailto:pedro.mr.cha...@gmail.com>>: >>>> > >>>> > Hello, >>>> > >>>> > Find attached the jobmanager.log. I've omitted the log lines from other >>>> > runs, only left the job manager info and the run with the error. >>>> > >>>> > jobmanager.log >>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log >>>> > >>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>> >>>> > >>>> > >>>> > >>>> > >>>> > Thanks again for your help. >>>> > >>>> > Regards, >>>> > Pedro. >>>> > >>>> > >>>> > >>>> > ----- >>>> > Best Regards, >>>> > Pedro Chaves >>>> > -- >>>> > Sent from: >>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>> >>> >> >