Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread).
1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively short time until the job finishes completely. And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option. Regards, Roman On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <ar...@ververica.com> wrote: > We could introduce an interface, sth like `RequiresFinalization` or >> `FinalizationListener` (all bad names). The operator itself knows when >> it is ready to completely shut down, Async I/O would wait for all >> requests, sink would potentially wait for a given number of checkpoints. >> The interface would have a method like `isFinalized()` that the >> framework can call after each checkpoint (and potentially at other >> points) > > > I think we are mixing two different things here that may require different > solutions: > 1. Tasks (=sink) that may need to do something with the final checkpoint. > 2. Tasks that only finish after having finished operations that do not > depend on data flow (async I/O, but I could also think of some timer > actions in process functions). > > Your proposal would help most for the first case. The second case can > solved entirely with current methods without being especially complicated: > - EOP is only emitted once Async I/O is done with all background tasks > - All timers are fired in a process function (I think we rather want to > fire immediately on EOP but that's a different discussion) > The advantage of this approach over your idea is that you don't need to > wait for a checkpoint to complete to check for finalization. > > Now let's look at the first case. I see two alternatives: > - The new sink interface implicitly incorporates this listener. Since I > don't see a use case outside sinks, we could simply add this method to the > new sink interface. > - We implicitly assume that a sink is done after having a successful > checkpoint at the end. Then we just need a tag interface > `RequiresFinalization`. It also feels like we should add the property > `final` to checkpoint options to help the sink detect that this is the last > checkpoint to be taken. We could also try to always have the final > checkpoint without tag interface on new sinks... > > On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> This is somewhat unrelated to the discussion about how to actually do >> the triggering when sources shut down, I'll write on that separately. I >> just wanted to get this quick thought out. >> >> For letting operators decide whether they actually want to wait for a >> final checkpoint, which is relevant at least for Async I/O and >> potentially for sinks. >> >> We could introduce an interface, sth like `RequiresFinalization` or >> `FinalizationListener` (all bad names). The operator itself knows when >> it is ready to completely shut down, Async I/O would wait for all >> requests, sink would potentially wait for a given number of checkpoints. >> The interface would have a method like `isFinalized()` that the >> framework can call after each checkpoint (and potentially at other >> points) >> >> This way we would decouple that logic from things that don't actually >> need it. What do you think? >> >> Best, >> Aljoscha >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >