Hi Arvid, Very thanks for the deep thoughts !
> If this somehow works, we would not need to change much in the checkpoint > coordinator. He would always inject into sources. We could also ignore the > race conditions as long as the TM lives. Checkpointing times are also not > worse as with the live task. > Clear downside (assuming feasibility) is that we have two code paths that > would deal with barriers. We would also need to keep more information in > the TM but again at some point the complete subtask fitted. I also agree with that with the slim information the checkpoint was more unified with the normal process, and it could simplify the changes to CheckpointCoordinator. I thought about some rought design with this direction, and I still have some concerns: 1. If we want to ensure we could always trigged the slim finished sources, we must ensure the TaskExecutor keeping this information not be released due to idle timeout. Thus the slim information would still pin some resources under some scenarios. If we have mixed jobs with both bounded and unbounded sources, the resources would be kept pinned. 2. I still have some concerns in introducing a new rpc communication network between TaskManagers. We might need to intiate the rpc connection together with the netty channels and close the rpc channels on job finished, this indicates that on job finished the JM need to try to notify TM to clean these connections, which requires reliable RPC message from JM to TM, which involves timeout & resent, and JM might be blocked before finished to close these connections. This would also complicate failover, since on failover we need to close the original connections and reconnect according to the new deployment. And for the RPC channels themselves, the TM also need to maintain heartbeat to ensure the channel is still opened, this would increase the burden for the TM rpc services when we have a lot of TMs. 3. For the slim information, we would need to clear the information on failover to avoid the TM get pinned wrongly. This would also requires scheduler to introduce a new process to clear the slim information that is similar to cancel a running task. 4. Since the checkpoint would only be available to streaming jobs, we might only want to keep the slim information and create the rpc channels between tasksmanagers for streaming jobs. Currently the batch jobs roughly only differ in that it does not have a CheckpontCoordinator to trigger the checkpoint and other components are mostly unified. If we also want to consider whether to keep slim information and create the rpc channels, the scheduler and the network layer would then also need to be aware of the execution mode and have special code path for streaming mode. For the JM-based method to retry triggering the following tasks: > Here I'm just concerned that we would overload JM. Especially if it's > cascading: A is triggered in A->B->C but finishes, > JM computes B and resends RPC but at that time B is also finished. Hence I > was thinking of using TMs instead and only > fall back to JM if TM has exited. If the overhead is the main concerns, I roughly think that we might avoid too much failed retriggers by make CheckpointCoordinator to wait a show period to accumulate more finished notification before taking actions. The JM would have heavy burden if in a short time there are a large batch of tasks get finished, with the show wait time the JM would be able to smooth the overload and avoid repeat trying. Best, Yun ------------------Original Mail ------------------ Sender:Arvid Heise <ar...@ververica.com> Send Date:Thu Jan 7 00:52:27 2021 Recipients:Aljoscha Krettek <aljos...@apache.org> CC:dev <dev@flink.apache.org>, Yun Gao <yungao...@aliyun.com> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Okay then at least you guys are in sync ;) (Although I'm also not too far away) I hope I'm not super derailing but could we reiterate why it's good to get rid of finished tasks (note: I'm also mostly in favor of that): 1. We can free all acquired resources including buffer pools, state backend(?), threads. 2. TM can forget about the subtask entirely. 3. We can subsequently downscale. 4. What more? I'm assuming it's not needed to execute the application at all: The application at one point had all subtasks running, so it's not a resource issue per se (ignoring rescaling). My idea is not to let the task live longer (except for final checkpoints where we are all on the same page I guess). I'm just thinking out loud if we can avoid 2. while still doing 1.+3. So can TM retain some slim information about a finished task to still process RPCs in a potentially different way? Thus, without keeping the costly task thread and operator chains, could we implement some RPC handler that knows this is a finished task and forward the barrier to the next task/TM? Can we store this slim information in a checkpoint as an operator subtask state? Could we transfer this slim information in case of (dynamic) downscaling? If this somehow works, we would not need to change much in the checkpoint coordinator. He would always inject into sources. We could also ignore the race conditions as long as the TM lives. Checkpointing times are also not worse as with the live task. Clear downside (assuming feasibility) is that we have two code paths that would deal with barriers. We would also need to keep more information in the TM but again at some point the complete subtask fitted. On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <aljos...@apache.org> wrote: > On 2021/01/06 16:05, Arvid Heise wrote: > >thanks for the detailed example. It feels like Aljoscha and you are also > >not fully aligned yet. For me, it sounded as if Aljoscha would like to > >avoid sending RPC to non-source subtasks. > > No, I think we need the triggering of intermediate operators. > > I was just thinking out loud about the potential scenarios where > intermediate operators will in fact stay online, and how common they > are. > > Also, I sent an explanation that is similar to Yuns. It seems we always > write out mails in parallel and then sent them before checking. :-) So > you always get two explanations of roughly the same thing. > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng