Hi Gen, Thanks for the suggestions!
Regarding how to implement the per-region RestartBackoffTimeStrategy as proposed previously, I think your approach works well. Here are more details: - Keep the RestartBackoffTimeStrategy interface API unchanged and only change its semantics, such that all strategies (e.g. failure rate, fixed delay, exponential delay) are applied per region. - Update ExecutionFailureHandler to create one RestartBackoffTimeStrategy instance for each region. ExecutionFailureHandler can get the region information from its SchedulingTopology. - ExecutionFailureHandler::getFailureHandlingResult() will use the strategy instance for the given failedTask's region to make the failover decision. Please see the other comment inline. Regards, Dong On Fri, Nov 25, 2022 at 7:42 PM Gen Luo <luogen...@gmail.com> wrote: > Hi all, > > Sorry for the late jumping in. > > To meet Weihua's need, Dong's proposal seems pretty fine, but the > modification it requires, I'm afraid, is not really easy. > RestartBackoffTimeStrategy is quite a simple interface. The strategy even > doesn't know which task is failing, not to mention the division of pipeline > regions. > To distinguish the failure count of each regions, it lacks too much > information, which is not easy to acquire for the strategy. > One approch I can figure out is to create different strategy instances to > different regions. In this way we do not need to modify the strategy but do > need to modify the schedulers or the ExecutionFailureHandler. > > On the other hand, I realize another case that the restart strategy may > need to be aware of the types and occurrence rate of the exceptions. That > is to avoid failing over but directly fail the job when some errors happen. > I know that there's an annotation > `@ThrowableAnnotation(ThrowableType.NonRecoverableError)` that can fail the > job, but I'm afraid there can be some scenarios that can not annotate the > exceptions, or catch and warp with an annotated exception. > While this is possible, do you have a concrete use-case that can not use and catch the annotated exception? It is probably safer to only add a new strategy (which is a public API) when we are sure we need it :) In such cases, handling in the restart strategy can be a good choice. > Such a strategy can even combines with other existing strategies which > handle the failure rate rather than the cause type. > > Besides, given that new strategies may be necessary, and existing > strategies may also need to enhance, maybe we should make the > RestartBackoffTimeStrategy a plugin rather than the enumerations, or > introduce a new custom type strategy which can load customized > implementations. > This can not solve the problem immediately, but makes the choice of restart > strategy more flexiable. > What do you think about this? > > Thanks. > > Paul Lam <paullin3...@gmail.com> 于 2022年11月21日周一 17:46写道: > > > Dong’s proposal LGTM. > > > > Best, > > Paul Lam > > > > > 2022年11月19日 10:50,Dong Lin <lindon...@gmail.com> 写道: > > > > > > Hey Weihua, > > > > > > Thanks for proposing the new strategy! > > > > > > If I understand correctly, the main issue is that different failover > > > regions can be restarted independently, but they share the same counter > > > when counting the number of failures in an interval. So the number of > > > failures for a given region is less than what users expect. > > > > > > Given that regions can be restarted independently, it might be more > > usable > > > and intuitive to count the number of failures for each region when > > > executing the failover strategy. Thus, instead of adding a new failover > > > strategy, how about we update the existing failure-rate strategy, and > > > probably other existing strategies as well, to use the following > > semantics: > > > > > > - For any given region in the job, its number of failures in > > > failure-rate-interval should not exceed max-failures-per-interval. > > > Otherwise, the job will fail without being restarted. > > > > > > By using this updated semantics, the keyby-connected job will have the > > same > > > behavior as the existing Flink when we use failure-rate strategy. For > > > the rescale-connected > > > job, in the case you described above, after the TM fails, each of the 3 > > > regions will increment its failure count from 0 to 1, which is still > less > > > than max-failures-per-interval. Thus the rescale-connected job can > > continue > > > to work. > > > > > > This alternative approach can solve the problem without increasing the > > > complexity of the failover strategy choice. And this approach does not > > > require us to check whether two exceptions belong to the same root > cause. > > > Do you think it can work? > > > > > > Thanks, > > > Dong > > > > > > > > > On Fri, Nov 4, 2022 at 4:46 PM Weihua Hu <huweihua....@gmail.com> > wrote: > > > > > >> Hi, everyone > > >> > > >> I'd like to bring up a discussion about restart strategy. Flink > > supports 3 > > >> kinds of restart strategy. These work very well for jobs with specific > > >> configs, but for platform users who manage hundreds of jobs, there is > no > > >> common strategy to use. > > >> > > >> Let me explain the reason. We manage a lot of jobs, some are > > >> keyby-connected with one region per job, some are rescale-connected > with > > >> many regions per job, and when using the failure rate restart > strategy, > > we > > >> cannot achieve the same control with the same configuration. > > >> > > >> For example, if I want the job to fail when there are 3 exceptions > > within 5 > > >> minutes, the config would look like this: > > >> > > >>> restart-strategy.failure-rate.max-failures-per-interval: 3 > > >>> > > >>> restart-strategy.failure-rate.failure-rate-interval: 5 min > > >>> > > >> For the keyby-connected job, this config works well. > > >> > > >> However, for the rescale-connected job, we need to consider the number > > of > > >> regions and the number of slots per TaskManager. If each TM has 3 > slots, > > >> and these 3 slots run the task of 3 regions, then when one TaskManager > > >> crashes, it will trigger 3 regions to fail, and the job will fail > > because > > >> it exceeds the threshold of the restart strategy. To avoid the effect > of > > >> single TM crashes, I must increase the max-failures-per-interval to 9, > > but > > >> after the change, user task exceptions will be more tolerant than I > > want. > > >> > > >> > > >> Therefore, I want to introduce a new restart strategy based on time > > >> periods. A continuous period of time (e.g., 5 minutes) is divided into > > >> segments of a specific length (e.g., 1 minute). If an exception occurs > > >> within a segment (no matter how many times), it is marked as a failed > > >> segment. Similar to failure-rate restart strategy, the job will fail > > when > > >> there are 'm' failed segments in the interval of 'n' . > > >> > > >> In this mode, the keyby-connected and rescale-connected jobs can use > > >> unified configurations. > > >> > > >> This is a user-relevant change, so if you think this is worth to do, > > maybe > > >> I can create a FLIP to describe it in detail. > > >> Best, > > >> Weihua > > >> > > > > >