Hi Arvid,

Very thanks for the deep thoughts ! 

> If this somehow works, we would not need to change much in the checkpoint
> coordinator. He would always inject into sources. We could also ignore the
> race conditions as long as the TM lives. Checkpointing times are also not
> worse as with the live task.
> Clear downside (assuming feasibility) is that we have two code paths that
> would deal with barriers. We would also need to keep more information in
> the TM but again at some point the complete subtask fitted.

I also agree with that with the slim information the checkpoint was more 
unified with the normal process, 
and it could simplify the changes to CheckpointCoordinator. I thought about 
some rought design
with this direction, and I still have some concerns:

1. If we want to ensure we could always trigged the slim finished sources, we 
must ensure the TaskExecutor
keeping this information not be released due to idle timeout. Thus the slim 
information would still pin some 
resources under some scenarios. If we have mixed jobs with both bounded and 
unbounded sources, the 
resources would be kept pinned.  

2. I still have some concerns in introducing a new rpc communication network 
between TaskManagers. We might
need to intiate the rpc connection together with the netty channels and close 
the rpc channels on job finished, this
indicates that on job finished the JM need to try to notify TM to clean these 
connections, which requires reliable 
RPC message from JM to TM, which involves timeout & resent, and JM might be 
blocked before finished to close
these connections. This would also complicate failover, since on failover we 
need to close the original connections
and reconnect according to the new deployment. And for the RPC channels 
themselves, the TM also need to maintain heartbeat 
to ensure the channel is still opened, this would increase the burden for the 
TM rpc services when we have a lot of TMs.

3. For the slim information, we would need to clear the information on failover 
to avoid the TM get pinned wrongly. This would 
also requires scheduler to introduce a new process to clear the slim 
information that is similar to cancel a running task. 

4. Since the checkpoint would only be available to streaming jobs, we might 
only want to keep the slim information and create
the rpc channels between tasksmanagers for streaming jobs. Currently the batch 
jobs roughly only differ in that it does not have
a CheckpontCoordinator to trigger the checkpoint and other components are 
mostly unified. If we also want to consider whether 
to keep slim information and create the rpc channels, the scheduler and the 
network layer would then also need to be aware of 
the execution mode and have special code path for streaming mode.

For the JM-based method to retry triggering the following tasks:

> Here I'm just concerned that we would overload JM. Especially if it's 
> cascading: A is triggered in A->B->C but finishes, 
> JM computes B and resends RPC but at that time B is also finished. Hence I 
> was thinking of using TMs instead and only
> fall back to JM if TM has exited.

If the overhead is the main concerns, I roughly think that we might avoid too 
much failed retriggers by make CheckpointCoordinator
to wait a show period to accumulate more finished notification before taking 
actions. The JM would have heavy burden if in a short
time there are a large batch of tasks get finished, with the show wait time the 
JM would be able to smooth the overload and avoid 
repeat trying.

Best,
 Yun
 ------------------Original Mail ------------------
Sender:Arvid Heise <ar...@ververica.com>
Send Date:Thu Jan 7 00:52:27 2021
Recipients:Aljoscha Krettek <aljos...@apache.org>
CC:dev <dev@flink.apache.org>, Yun Gao <yungao...@aliyun.com>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Okay then at least you guys are in sync ;) (Although I'm also not too far
away)

I hope I'm not super derailing but could we reiterate why it's good to get
rid of finished tasks (note: I'm also mostly in favor of that):
1. We can free all acquired resources including buffer pools, state
backend(?), threads.
2. TM can forget about the subtask entirely.
3. We can subsequently downscale.
4. What more?

I'm assuming it's not needed to execute the application at all: The
application at one point had all subtasks running, so it's not a resource
issue per se (ignoring rescaling).

My idea is not to let the task live longer (except for final checkpoints
where we are all on the same page I guess). I'm just thinking out loud if
we can avoid 2. while still doing 1.+3.

So can TM retain some slim information about a finished task to still
process RPCs in a potentially different way?
Thus, without keeping the costly task thread and operator chains, could we
implement some RPC handler that knows this is a finished task and forward
the barrier to the next task/TM?
Can we store this slim information in a checkpoint as an operator subtask
state?
Could we transfer this slim information in case of (dynamic) downscaling?

If this somehow works, we would not need to change much in the checkpoint
coordinator. He would always inject into sources. We could also ignore the
race conditions as long as the TM lives. Checkpointing times are also not
worse as with the live task.
Clear downside (assuming feasibility) is that we have two code paths that
would deal with barriers. We would also need to keep more information in
the TM but again at some point the complete subtask fitted.

On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <aljos...@apache.org> wrote:

> On 2021/01/06 16:05, Arvid Heise wrote:
> >thanks for the detailed example. It feels like Aljoscha and you are also
> >not fully aligned yet. For me, it sounded as if Aljoscha would like to
> >avoid sending RPC to non-source subtasks.
>
> No, I think we need the triggering of intermediate operators.
>
> I was just thinking out loud about the potential scenarios where
> intermediate operators will in fact stay online, and how common they
> are.
>
> Also, I sent an explanation that is similar to Yuns. It seems we always
> write out mails in parallel and then sent them before checking. :-) So
> you always get two explanations of roughly the same thing.
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to