Hi Tony,

Thanks for the explanation. Assuming that’s what’s happening, then I agree, 
this checkStyle should be removed. I created a ticket for this issue 
https://issues.apache.org/jira/browse/FLINK-14979 
<https://issues.apache.org/jira/browse/FLINK-14979>

Piotrek

> On 27 Nov 2019, at 16:28, Tony Wei <tony19920...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> The case here was that the first snapshot is a savepoint. I know that if the 
> following checkpoint succeeded before the previous one, the previous one will 
> be subsumed by JobManager. However, if that previous one is a savepoint, it 
> won't be subsumed. That leads to the case that Chesney said. The following 
> checkpoint succeeded before the previous savepoint, handling both of their 
> pending transaction, but savepoint still succeeded and sent the notification 
> to each TaskManager. That led to this exception. Could you double check if 
> this is the case? Thank you. 
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 
> 2019年11月27日 週三 下午8:50 寫道:
> Hi,
> 
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based 
> on Pravega’s sink for Flink, which was implemented by Stephan, and it has the 
> same logic [1]. If I remember the discussions with Stephan/Till, the way how 
> Flink is using Akka probably guarantees that messages will be always 
> delivered, except of some failure, so `notifyCheckpointComplete` could be 
> missed probably only if a failure happens between snapshot and arrival of the 
> notification. Receiving the same notification twice should be impossible 
> (based on the knowledge passed to me from Till/Stephan).
> 
> However, for one thing, if that’s possible, then the code should adjusted 
> accordingly. On the other hand, maybe there is no harm in relaxing the 
> contract? Even if we miss this notification (because of some re-ordering?), 
> next one will subsume the missed one and commit everything. 
> 
> Piotrek
> 
> [1] 
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>  
> <https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567>
> 
>> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org 
>> <mailto:ches...@apache.org>> wrote:
>> 
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. 
>> The notification for complete checkpoints is not reliable; it may be late, 
>> not come at all, possibly even in different order than expected.
>> 
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify 
>> the sink will always fail with an exception.
>> 
>> What it should do imo is either a) don't check that there is a pending 
>> transaction or b) track the highest checkpoint id received and optionally 
>> don't fail if the notification is for an older CP.
>> 
>> @piotr WDYT?
>> 
>> On 27/11/2019 08:59, Tony Wei wrote:
>>> Hi, 
>>> 
>>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>>> notification could still be send to each TMs.
>>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>> 
>>> Best,
>>> Tony Wei
>>> 
>>> Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於 
>>> 2019年11月27日 週三 下午3:43寫道:
>>> Hi, 
>>> 
>>> I want to raise this question again, since I have had this exception on my 
>>> production job.
>>> 
>>> The exception is as follows
>>>  
>>> 2019-11-27 14:47:29
>>>  
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>>> transaction pending
>>>     at 
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>     at 
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>>>     ... 5 more
>>> 
>>> And these are the checkpoint / savepoint before the job failed.
>>> <checkoint.png>
>>> 
>>> It seems that checkpoint # 675's notification handled the savepoint # 674's 
>>> pending transaction holder, but savepoint #674's notification didn't be 
>>> subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, some tasks got notification before 
>>> getting the checkpoint barrier and led to this exception happened, because 
>>> there was no pending transaction in queue.
>>> 
>>> Does anyone know the details about subsumed notifications mechanism and how 
>>> checkpoint coordinator handle this situation? Please correct me if I'm 
>>> wrong. Thanks.
>>> 
>>> Best,
>>> Tony Wei
>>> 
>>> Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一 下午5:03寫道:
>>> Hi Pedro,
>>> 
>>> unfortunately the interesting parts are all removed from the log, we 
>>> already know about the exception itself. In particular, what I would like 
>>> to see is what checkpoints have been triggered and completed before the 
>>> exception happens.
>>> 
>>> Best,
>>> Stefan
>>> 
>>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <pedro.mr.cha...@gmail.com 
>>> > <mailto:pedro.mr.cha...@gmail.com>>:
>>> > 
>>> > Hello,
>>> > 
>>> > Find attached the jobmanager.log. I've omitted the log lines from other
>>> > runs, only left the job manager info and the run with the error. 
>>> > 
>>> > jobmanager.log
>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log
>>> >  
>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>>
>>> >   
>>> > 
>>> > 
>>> > 
>>> > Thanks again for your help.
>>> > 
>>> > Regards,
>>> > Pedro.
>>> > 
>>> > 
>>> > 
>>> > -----
>>> > Best Regards,
>>> > Pedro Chaves
>>> > --
>>> > Sent from: 
>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> 
>> 
> 

Reply via email to