This looks to me like the TwoPhaseCommitSinkFunction is a bit too
strict. The notification for complete checkpoints is not reliable; it
may be late, not come at all, possibly even in different order than
expected.
As such, if you a simple case of snapshot -> snapshot -> notify ->
notify the sink will always fail with an exception.
What it should do imo is either a) don't check that there is a pending
transaction or b) track the highest checkpoint id received and
optionally don't fail if the notification is for an older CP.
@piotr WDYT?
On 27/11/2019 08:59, Tony Wei wrote:
Hi,
As the follow up, it seem that savepoint can't be subsumed, so that
its notification could still be send to each TMs.
Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
Best,
Tony Wei
Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> 於
2019年11月27日 週三 下午3:43寫道:
Hi,
I want to raise this question again, since I have had this
exception on my production job.
The exception is as follows
2019-11-27 14:47:29
java.lang.RuntimeException: Error while confirming checkpoint
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.IllegalStateException: checkpoint completed, but no
transaction pending at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more
And these are the checkpoint / savepoint before the job failed.
checkoint.png
It seems that checkpoint # 675's notification handled the
savepoint # 674's pending transaction holder, but savepoint #674's
notification didn't be subsumed or be ignored by JM.
Therefore, during the checkpoint #676, some tasks got notification
before getting the checkpoint barrier and led to this exception
happened, because there was no pending transaction in queue.
Does anyone know the details about subsumed notifications
mechanism and how checkpoint coordinator handle this situation?
Please correct me if I'm wrong. Thanks.
Best,
Tony Wei
Stefan Richter <s.rich...@data-artisans.com
<mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一
下午5:03寫道:
Hi Pedro,
unfortunately the interesting parts are all removed from the
log, we already know about the exception itself. In
particular, what I would like to see is what checkpoints have
been triggered and completed before the exception happens.
Best,
Stefan
> Am 08.10.2018 um 10:23 schrieb PedroMrChaves
<pedro.mr.cha...@gmail.com <mailto:pedro.mr.cha...@gmail.com>>:
>
> Hello,
>
> Find attached the jobmanager.log. I've omitted the log lines
from other
> runs, only left the job manager info and the run with the
error.
>
> jobmanager.log
>
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>
>
>
> Thanks again for your help.
>
> Regards,
> Pedro.
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/