Hi Stephan & Xiaogang, It's great to see this discussion active again!
It makes sense to me that doing some private optimization and trial through plugin. I understand that the community could not satisfy every one and every requirement due to limited resources. The pluggable strategy is a good way to compromise. In that way, it might be also helpful for improving the pluggable strategy itself since there might be some reasonable requirements from the plugin. Regarding to the "at-most-once" or "best-effort" semantics, I think it worths going further since we heard these requirements several times. However I think we need more investigations of implementing based on pluggable shuffle service and scheduler (or some more components?). There might be a public discussion when we are ready. I hope it would happen soon. On Wed, Jul 24, 2019 at 9:43 AM SHI Xiaogang <shixiaoga...@gmail.com> wrote: > Hi Stephan, > > I agree with you that the implementation of "at-most-once" or > "best-effort" recovery will benefit from pluggable shuffle service and > pluggable scheduler. Actually we made some attempts in our private > repository and it turns out that it requires quite a lot of work to > implement this with exsiting network stack. We can start the work on this > when pluggable shuffle service and pluggable scheduler are ready. > > The suggestion of external implementation is a very good idea. That way, we > can implement both "at-most-once" and "best-effort" guarantees as different > checkpoint/failover strategies. If so, i think we should focus on the > components that are changed in different strategies. These components may > include a pluggable checkpoint barrier handler and a pluggable failover > strategy. We can list these components and discuss implementation details > then. > > What do you think, Biao Liu and Zhu Zhu? > > Regards, > Xiaogang > > > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午1:31写道: > > > Hi all! > > > > This is an interesting discussion for sure. > > > > Concerning user requests for changes modes, I also hear the following > quite > > often: > > - reduce the expensiveness of checkpoint alignment (unaligned > > checkpoints) to make checkpoints fast/stable under high backpressure > > - more fine-grained failover while maintaining exactly-once (even if > > costly) > > > > Having also "at most once" to the mix is quite a long list of big changes > > to the system. > > > > My feeling is that on such a core system, the community can not push all > > these efforts at the same time, especially because they touch overlapping > > areas of the system and need the same committers involved. > > > > On the other hand, the pluggable shuffle service and pluggable scheduler > > could make it possible to have an external implementation of that. > > - of a network stack that supports "reconnects" of failed tasks with > > continuing tasks > > - a scheduling strategy that restarts tasks individually even in > > pipelined regions > > > > I think contributors/committers could implements this separate from the > > Flink core. The feature would be trial-run it through the community > > packages. If it gains a lot of traction, the community could decide to > put > > in the effort to merge this into the core. > > > > Best, > > Stephan > > > > > > On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <shixiaoga...@gmail.com> > > wrote: > > > > > Hi All, > > > > > > It definitely requires a massive effort to allow at-most-once delivery > in > > > Flink. But as the feature is urgently demanded by many Flink users, i > > think > > > every effort we made is worthy. Actually, the inability to support > > > at-most-once delivery has become a major obstacle for Storm users to > turn > > > to Flink. It's undesirable for us to run different stream processing > > > systems for different scenarios. > > > > > > I agree with Zhu Zhu that the guarantee we provide is the very first > > thing > > > to be discussed. Recovering with checkpoints will lead to duplicated > > > records, thus break the guarantee on at-most-once delivery. > > > > > > A method to achieve at-most-once guarantee is to completely disable > > > checkpointing and let sources only read those records posted after they > > > start. The method requires sources to allow the configuration to read > > > latest records, which luckily is supported by many message queues > > including > > > Kafka. As Flink relies sources' ability to rollback to achieve > exact-only > > > and at-least-once delivery, i think it's acceptable for Flink to rely > > > sources' ability to read latest records to achieve at-most once > delivery. > > > This method does not require any modification to existing checkpointing > > > mechanism. Besides, as there is no need to restoring from checkpoints, > > > failed tasks can recover themselves at the fastest speed. > > > > > > Concerning the implementation efforts, i think we can benefit from some > > > ongoing work including shuffle services and fine-grained recovery. For > > > example, currently the exceptions in network connections will lead to > > > failures of downstream and upstream tasks. To achieve at-most-once > > > delivery, we should decouple intermediate results from tasks, reporting > > the > > > exceptions of intermediate results to job master and letting the > failover > > > strategy to determine the actions taken. Some work is already done in > the > > > efforts to achieve fine-grained recovery, which can be extended to > allow > > > at-most-once delivery in Flink. > > > > > > But before starting the discussion on implementation details, as said > at > > > prior, we need to determine the guarantee we provide in the scenarios > > where > > > timely recovery is needed. > > > * What do you think of the at-most-once guarantee achieved by the > > proposed > > > method? > > > * Do we need checkpointing to reduce the amount of lost data? > > > * Do we need deduplication to guarantee at-most-once delivery or just > > > provide best-effort delivery? > > > > > > Regards, > > > Xiaogang Shi > > > > > > > > > Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道: > > > > > > > Hi Xiaogang, > > > > > > > > It sounds interesting and definitely a useful feature, however the > > > > questions for me would be how useful, how much effort would it > require > > > and > > > > is it worth it? We simply can not do all things at once, and > currently > > > > people that could review/drive/mentor this effort are pretty much > > > strained > > > > :( For me one would have to investigate answers to those questions > and > > > > prioritise it compared to other ongoing efforts, before I could vote > +1 > > > for > > > > this. > > > > > > > > Couple of things to consider: > > > > - would it be only a job manager/failure region recovery feature? > > > > - would it require changes in CheckpointBarrierHandler, > > > > CheckpointCoordinator classes? > > > > - with `at-most-once` semantic theoretically speaking we could just > > drop > > > > the current `CheckpointBarrier` handling/injecting code and avoid all > > of > > > > the checkpoint alignment issues - we could just checkpoint all of the > > > tasks > > > > independently of one another. However maybe that could be a follow up > > > > optimisation step? > > > > > > > > Piotrek > > > > > > > > > On 11 Jun 2019, at 10:53, Zili Chen <wander4...@gmail.com> wrote: > > > > > > > > > > Hi Xiaogang, > > > > > > > > > > It is an interesting topic. > > > > > > > > > > Notice that there is some effort to build a mature mllib of flink > > these > > > > > days, it could be also possible for some ml cases trade off > > correctness > > > > for > > > > > timeliness or throughput. Excatly-once delivery excatly makes flink > > > stand > > > > > out but an at-most-once option would adapt flink to more scenarios. > > > > > > > > > > Best, > > > > > tison. > > > > > > > > > > > > > > > SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月11日周二 下午4:33写道: > > > > > > > > > >> Flink offers a fault-tolerance mechanism to guarantee > at-least-once > > > and > > > > >> exactly-once message delivery in case of failures. The mechanism > > works > > > > well > > > > >> in practice and makes Flink stand out among stream processing > > systems. > > > > >> > > > > >> But the guarantee on at-least-once and exactly-once delivery does > > not > > > > come > > > > >> without price. It typically requires to restart multiple tasks and > > > fall > > > > >> back to the place where the last checkpoint is taken. > (Fined-grained > > > > >> recovery can help alleviate the cost, but it still needs certain > > > > efforts to > > > > >> recover jobs.) > > > > >> > > > > >> In some senarios, users perfer quick recovery and will trade > > > correctness > > > > >> off. For example, in some online recommendation systems, > timeliness > > is > > > > far > > > > >> more important than consistency. In such cases, we can restart > only > > > > those > > > > >> failed tasks individually, and do not need to perform any > rollback. > > > > Though > > > > >> some messages delivered to failed tasks may be lost, other tasks > can > > > > >> continuously provide service to users. > > > > >> > > > > >> Many of our users are demanding for at-most-once delivery in > Flink. > > > > What do > > > > >> you think of the proposal? Any feedback is appreciated. > > > > >> > > > > >> Regards, > > > > >> Xiaogang Shi > > > > >> > > > > > > > > > > > > > >