I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).

But your explanation definitely helped me to better understand the race
condition.

However, I have the impression that you think mostly in terms of tasks and
I mostly think in terms of subtasks. I especially want to have proper
support for bounded sources where one partition is much larger than the
other partitions (might be in conjunction with unbounded sources such that
checkpointing is plausible to begin with). Hence, most of the subtasks are
finished with one struggler remaining. In this case, the barriers are
inserted now only in the struggling source subtask and potentially in any
running downstream subtask.
As far as I have understood, this would require barriers to be inserted
downstream leading to similar race conditions.

I'm also concerned about the notion of a final checkpoint. What happens
when this final checkpoint times out (checkpoint timeout > async timeout)
or fails for a different reason? I'm currently more inclined to just let
checkpoints work until the whole graph is completed (and thought this was
the initial goal of the whole FLIP to being with). However, that would
require subtasks to stay alive until they receive checkpiontCompleted
callback (which is currently also not guaranteed)...

On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> On 2021/01/06 11:30, Arvid Heise wrote:
> >I'm assuming that this is the normal case. In a A->B graph, as soon as A
> >finishes, B still has a couple of input buffers to process. If you add
> >backpressure or longer pipelines into the mix, it's quite likely that a
> >checkpoint may occur with B being the head.
>
> Ahh, I think I know what you mean. This can happen when the checkpoint
> coordinator issues concurrent checkpoint without waiting for older ones
> to finish. My head is mostly operating under the premise that there is
> at most one concurrent checkpoint.
>
> In the current code base the race conditions that Yun and I are talking
> about cannot occur. Checkpoints can only be triggered at sources and
> they will then travel through the graph. Intermediate operators are
> never directly triggered from the JobManager/CheckpointCoordinator.
>
> When source start to shut down, the JM has to directly inject/trigger
> checkpoints at the now new "sources" of the graph, which have previously
> been intermediate operators.
>
> I want to repeat that I have a suspicion that maybe this is a degenerate
> case and we never want to allow operators to be doing checkpoints when
> they are not connected to at least one running source.  Which means that
> we have to find a solution for declined checkpoints, missing sources.
>
> I'll first show an example where I think we will never have intermediate
> operators running without the sources being running:
>
> Source -> Map -> Sink
>
> Here, when the Source does its final checkpoint and then shuts down,
> that same final checkpoint would travel downstream ahead of the EOF,
> which would in turn cause Map and Sink to also shut down. *We can't have
> the case that Map is still running when we want to take a checkpoint and
> Source is not running*.
>
> A similar case is this one:
>
> Source1 --+
>            |->Map -> Sink
> Source2 --+
>
> Here, if Source1 is finished but Source2 is not, Map is still connected
> to at least one upstream source that is still running. Again. Map would
> never be running and doing checkpoints if neither of Source1 or Source2
> are online.
>
> The cases I see where intermediate operators would keep running despite
> not being connected to any upstream operators are when we purposefully
> keep an operator online despite all inputs having seen EOF. One example
> is async I/O, another is what Yun mentioned where a sink might want to
> wait for another checkpoint to confirm some data. Example:
>
> Source -> Async I/O -> Sink
>
> Here, Async I/O will stay online as long as there are some scheduled
> requests outstanding, even when the Source has shut down. In those
> cases, the checkpoint coordinator would have to trigger new checkpoints
> at Async I/O and not Source, because it has become the new "head" of the
> graph.
>
> For Async I/O at least, we could say that the operator will wait for all
> outstanding requests to finish before it allows the final checkpoint and
> passes the barrier forward.
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to