Hi Tony, Thanks for the explanation. Assuming that’s what’s happening, then I agree, this checkStyle should be removed. I created a ticket for this issue https://issues.apache.org/jira/browse/FLINK-14979 <https://issues.apache.org/jira/browse/FLINK-14979>
Piotrek > On 27 Nov 2019, at 16:28, Tony Wei <tony19920...@gmail.com> wrote: > > Hi Piotrek, > > The case here was that the first snapshot is a savepoint. I know that if the > following checkpoint succeeded before the previous one, the previous one will > be subsumed by JobManager. However, if that previous one is a savepoint, it > won't be subsumed. That leads to the case that Chesney said. The following > checkpoint succeeded before the previous savepoint, handling both of their > pending transaction, but savepoint still succeeded and sent the notification > to each TaskManager. That led to this exception. Could you double check if > this is the case? Thank you. > > Best, > Tony Wei > > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 > 2019年11月27日 週三 下午8:50 寫道: > Hi, > > Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based > on Pravega’s sink for Flink, which was implemented by Stephan, and it has the > same logic [1]. If I remember the discussions with Stephan/Till, the way how > Flink is using Akka probably guarantees that messages will be always > delivered, except of some failure, so `notifyCheckpointComplete` could be > missed probably only if a failure happens between snapshot and arrival of the > notification. Receiving the same notification twice should be impossible > (based on the knowledge passed to me from Till/Stephan). > > However, for one thing, if that’s possible, then the code should adjusted > accordingly. On the other hand, maybe there is no harm in relaxing the > contract? Even if we miss this notification (because of some re-ordering?), > next one will subsume the missed one and commit everything. > > Piotrek > > [1] > https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567 > > <https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567> > >> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org >> <mailto:ches...@apache.org>> wrote: >> >> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. >> The notification for complete checkpoints is not reliable; it may be late, >> not come at all, possibly even in different order than expected. >> >> As such, if you a simple case of snapshot -> snapshot -> notify -> notify >> the sink will always fail with an exception. >> >> What it should do imo is either a) don't check that there is a pending >> transaction or b) track the highest checkpoint id received and optionally >> don't fail if the notification is for an older CP. >> >> @piotr WDYT? >> >> On 27/11/2019 08:59, Tony Wei wrote: >>> Hi, >>> >>> As the follow up, it seem that savepoint can't be subsumed, so that its >>> notification could still be send to each TMs. >>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction? >>> >>> Best, >>> Tony Wei >>> >>> Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於 >>> 2019年11月27日 週三 下午3:43寫道: >>> Hi, >>> >>> I want to raise this question again, since I have had this exception on my >>> production job. >>> >>> The exception is as follows >>> >>> 2019-11-27 14:47:29 >>> >>> java.lang.RuntimeException: Error while confirming checkpoint >>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no >>> transaction pending >>> at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) >>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) >>> ... 5 more >>> >>> And these are the checkpoint / savepoint before the job failed. >>> <checkoint.png> >>> >>> It seems that checkpoint # 675's notification handled the savepoint # 674's >>> pending transaction holder, but savepoint #674's notification didn't be >>> subsumed or be ignored by JM. >>> Therefore, during the checkpoint #676, some tasks got notification before >>> getting the checkpoint barrier and led to this exception happened, because >>> there was no pending transaction in queue. >>> >>> Does anyone know the details about subsumed notifications mechanism and how >>> checkpoint coordinator handle this situation? Please correct me if I'm >>> wrong. Thanks. >>> >>> Best, >>> Tony Wei >>> >>> Stefan Richter <s.rich...@data-artisans.com >>> <mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一 下午5:03寫道: >>> Hi Pedro, >>> >>> unfortunately the interesting parts are all removed from the log, we >>> already know about the exception itself. In particular, what I would like >>> to see is what checkpoints have been triggered and completed before the >>> exception happens. >>> >>> Best, >>> Stefan >>> >>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <pedro.mr.cha...@gmail.com >>> > <mailto:pedro.mr.cha...@gmail.com>>: >>> > >>> > Hello, >>> > >>> > Find attached the jobmanager.log. I've omitted the log lines from other >>> > runs, only left the job manager info and the run with the error. >>> > >>> > jobmanager.log >>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log >>> > >>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>> >>> > >>> > >>> > >>> > >>> > Thanks again for your help. >>> > >>> > Regards, >>> > Pedro. >>> > >>> > >>> > >>> > ----- >>> > Best Regards, >>> > Pedro Chaves >>> > -- >>> > Sent from: >>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> >> >