Thank you all for investigation/reporting/discussion. I have merged an older PR 
[1] that was fixing this issue which was previously rejected as we didn’t 
realise this is a production issue.

I have merged it and issue should be fixed in Flink 1.10, 1.9.2 and 1.8.3 
releases.

Piotrek

[1] https://github.com/apache/flink/pull/6723 
<https://github.com/apache/flink/pull/6723>

> On 28 Nov 2019, at 02:52, Tony Wei <tony19920...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> There was already an issue [1] and PR for this thread. Should we mark it as 
> duplicated or related issue?
> 
> Best,
> Tony Wei
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10377 
> <https://issues.apache.org/jira/browse/FLINK-10377>
> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 
> 2019年11月28日 週四 上午12:17寫道:
> Hi Tony,
> 
> Thanks for the explanation. Assuming that’s what’s happening, then I agree, 
> this checkStyle should be removed. I created a ticket for this issue 
> https://issues.apache.org/jira/browse/FLINK-14979 
> <https://issues.apache.org/jira/browse/FLINK-14979>
> 
> Piotrek
> 
>> On 27 Nov 2019, at 16:28, Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> The case here was that the first snapshot is a savepoint. I know that if the 
>> following checkpoint succeeded before the previous one, the previous one 
>> will be subsumed by JobManager. However, if that previous one is a 
>> savepoint, it won't be subsumed. That leads to the case that Chesney said. 
>> The following checkpoint succeeded before the previous savepoint, handling 
>> both of their pending transaction, but savepoint still succeeded and sent 
>> the notification to each TaskManager. That led to this exception. Could you 
>> double check if this is the case? Thank you. 
>> 
>> Best,
>> Tony Wei
>> 
>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 於 
>> 2019年11月27日 週三 下午8:50 寫道:
>> Hi,
>> 
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based 
>> on Pravega’s sink for Flink, which was implemented by Stephan, and it has 
>> the same logic [1]. If I remember the discussions with Stephan/Till, the way 
>> how Flink is using Akka probably guarantees that messages will be always 
>> delivered, except of some failure, so `notifyCheckpointComplete` could be 
>> missed probably only if a failure happens between snapshot and arrival of 
>> the notification. Receiving the same notification twice should be impossible 
>> (based on the knowledge passed to me from Till/Stephan).
>> 
>> However, for one thing, if that’s possible, then the code should adjusted 
>> accordingly. On the other hand, maybe there is no harm in relaxing the 
>> contract? Even if we miss this notification (because of some re-ordering?), 
>> next one will subsume the missed one and commit everything. 
>> 
>> Piotrek
>> 
>> [1] 
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>  
>> <https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567>
>> 
>>> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org 
>>> <mailto:ches...@apache.org>> wrote:
>>> 
>>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. 
>>> The notification for complete checkpoints is not reliable; it may be late, 
>>> not come at all, possibly even in different order than expected.
>>> 
>>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify 
>>> the sink will always fail with an exception.
>>> 
>>> What it should do imo is either a) don't check that there is a pending 
>>> transaction or b) track the highest checkpoint id received and optionally 
>>> don't fail if the notification is for an older CP.
>>> 
>>> @piotr WDYT?
>>> 
>>> On 27/11/2019 08:59, Tony Wei wrote:
>>>> Hi, 
>>>> 
>>>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>>>> notification could still be send to each TMs.
>>>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>>> 
>>>> Best,
>>>> Tony Wei
>>>> 
>>>> Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於 
>>>> 2019年11月27日 週三 下午3:43寫道:
>>>> Hi, 
>>>> 
>>>> I want to raise this question again, since I have had this exception on my 
>>>> production job.
>>>> 
>>>> The exception is as follows
>>>>  
>>>> 2019-11-27 14:47:29
>>>>  
>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>>>>     at 
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>>>> transaction pending
>>>>     at 
>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>>     at 
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>>>>     at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>     at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>>>>     ... 5 more
>>>> 
>>>> And these are the checkpoint / savepoint before the job failed.
>>>> <checkoint.png>
>>>> 
>>>> It seems that checkpoint # 675's notification handled the savepoint # 
>>>> 674's pending transaction holder, but savepoint #674's notification didn't 
>>>> be subsumed or be ignored by JM.
>>>> Therefore, during the checkpoint #676, some tasks got notification before 
>>>> getting the checkpoint barrier and led to this exception happened, because 
>>>> there was no pending transaction in queue.
>>>> 
>>>> Does anyone know the details about subsumed notifications mechanism and 
>>>> how checkpoint coordinator handle this situation? Please correct me if I'm 
>>>> wrong. Thanks.
>>>> 
>>>> Best,
>>>> Tony Wei
>>>> 
>>>> Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一 下午5:03寫道:
>>>> Hi Pedro,
>>>> 
>>>> unfortunately the interesting parts are all removed from the log, we 
>>>> already know about the exception itself. In particular, what I would like 
>>>> to see is what checkpoints have been triggered and completed before the 
>>>> exception happens.
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <pedro.mr.cha...@gmail.com 
>>>> > <mailto:pedro.mr.cha...@gmail.com>>:
>>>> > 
>>>> > Hello,
>>>> > 
>>>> > Find attached the jobmanager.log. I've omitted the log lines from other
>>>> > runs, only left the job manager info and the run with the error. 
>>>> > 
>>>> > jobmanager.log
>>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log
>>>> >  
>>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>>
>>>> >   
>>>> > 
>>>> > 
>>>> > 
>>>> > Thanks again for your help.
>>>> > 
>>>> > Regards,
>>>> > Pedro.
>>>> > 
>>>> > 
>>>> > 
>>>> > -----
>>>> > Best Regards,
>>>> > Pedro Chaves
>>>> > --
>>>> > Sent from: 
>>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>>> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> 
>>> 
>> 
> 

Reply via email to