Hi Gyula, Do we think the scaler could be a plugin or hard coded ? We observed some cases scaler can't address (e.g async io dependency service degradation or small spike that doesn't worth restarting job)
Thanks, Chen On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Dong! > > Could you please confirm that your main concerns have been addressed? > > Some other minor details that might not have been fully clarified: > - The prototype has been validated on some production workloads yes > - We are only planning to use metrics that are generally available and are > previously accepted to be standardized connector metrics (not Kafka > specific). This is actually specified in the FLIP > - Even if some metrics (such as pendingRecords) are not accessible the > scaling algorithm works and can be used. For source scaling based on > utilization alone we still need some trivial modifications on the > implementation side. > > Cheers, > Gyula > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hi Dong! > > > > This is not an experimental feature proposal. The implementation of the > > prototype is still in an experimental phase but by the time the FLIP, > > initial prototype and review is done, this should be in a good stable > first > > version. > > This proposal is pretty general as autoscalers/tuners get as far as I > > understand and there is no history of any alternative effort that even > > comes close to the applicability of this solution. > > > > Any large features that were added to Flink in the past have gone through > > several iterations over the years and the APIs have evolved as they > matured. > > Something like the autoscaler can only be successful if there is enough > > user exposure and feedback to make it good, putting it in an external > repo > > will not get us anywhere. > > > > We have a prototype implementation ready that works well and it is more > or > > less feature complete. We proposed this FLIP based on something that we > see > > as a working solution, please do not underestimate the effort that went > > into this proposal and the validation of the ideas. So in this sense our > > approach here is the same as with the Table Store and Kubernetes Operator > > and other big components of the past. On the other hand it's impossible > to > > sufficiently explain all the technical depth/implementation details of > such > > complex components in FLIPs to 100%, I feel we have a good overview of > the > > algorithm in the FLIP and the implementation should cover all remaining > > questions. We will have an extended code review phase following the FLIP > > vote before this make it into the project. > > > > I understand your concern regarding the stability of Flink Kubernetes > > Operator config and metric names. We have decided to not provide > guarantees > > there yet but if you feel that it's time for the operator to support such > > guarantees please open a separate discussion on that topic, I don't want > to > > mix the two problems here. > > > > Regards, > > Gyula > > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin <lindon...@gmail.com> wrote: > > > >> Hi Gyula, > >> > >> If I understand correctly, this autopilot proposal is an experimental > >> feature and its configs/metrics are not mature enough to provide > backward > >> compatibility yet. And the proposal provides high-level ideas of the > >> algorithm but it is probably too complicated to explain it end-to-end. > >> > >> On the one hand, I do agree that having an auto-tuning prototype, even > if > >> not mature, is better than nothing for Flink users. On the other hand, I > >> am > >> concerned that this FLIP seems a bit too experimental, and starting with > >> an > >> immature design might make it harder for us to reach a production-ready > >> and > >> generally applicable auto-tuner in the future. And introducing too > >> backward > >> incompatible changes generally hurts users' trust in the Flink project. > >> > >> One alternative might be to develop and experiment with this feature in > a > >> non-Flink repo. You can iterate fast without worrying about typically > >> backward compatibility requirement as required for most Flink public > >> features. And once the feature is reasonably evaluated and mature > enough, > >> it will be much easier to explain the design and address all the issues > >> mentioned above. For example, Jingsong implemented a Flink Table Store > >> prototype > >> <https://github.com/JingsongLi/flink/tree/table_storage/flink-table> > >> before > >> proposing FLIP-188 in this thread > >> <https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064>. > >> > >> I don't intend to block your progress. Just my two cents. It will be > great > >> to hear more from other developers (e.g. in the voting thread). > >> > >> Thanks, > >> Dong > >> > >> > >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra <gyula.f...@gmail.com> > wrote: > >> > >> > Hi Dong, > >> > > >> > Let me address your comments. > >> > > >> > Time for scale / backlog processing time derivation: > >> > We can add some more details to the Flip but at this point the > >> > implementation is actually much simpler than the algorithm to describe > >> it. > >> > I would not like to add more equations etc because it just > >> overcomplicates > >> > something relatively simple in practice. > >> > > >> > In a nutshell: Time to recover == lag / > processing-rate-after-scaleup. > >> > It's fairly easy to see where this is going, but best to see in code. > >> > > >> > Using pendingRecords and alternative mechanisms: > >> > True that the current algorithm relies on pending records to > effectively > >> > compute the target source processing rates and therefore scale > sources. > >> > This is available for Kafka which is by far the most common streaming > >> > source and is used by the majority of streaming applications > currently. > >> > It would be very easy to add alternative purely utilization based > >> scaling > >> > to the sources. We can start with the current proposal and add this > >> along > >> > the way before the first version. > >> > > >> > Metrics, Configs and Public API: > >> > The autoscaler feature is proposed for the Flink Kubernetes Operator > >> which > >> > does not have the same API/config maturity and thus does not provide > the > >> > same guarantees. > >> > We currently support backward compatibilty for the CRD itself and not > >> the > >> > configs or metrics. This does not mean that we do not aim to do so but > >> at > >> > this stage we still have to clean up the details of the newly added > >> > components. In practice this means that if we manage to get the > metrics > >> / > >> > configs right at the first try we will keep them and provide > >> compatibility, > >> > but if we feel that we missed something or we don't need something we > >> can > >> > still remove it. It's a more pragmatic approach for such a component > >> that > >> > is likely to evolve than setting everything in stone immediately. > >> > > >> > Cheers, > >> > Gyula > >> > > >> > > >> > > >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin <lindon...@gmail.com> wrote: > >> > > >> > > Thanks for the update! Please see comments inline. > >> > > > >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels <m...@apache.org > > > >> > > wrote: > >> > > > >> > > > Of course! Let me know if your concerns are addressed. The wiki > page > >> > has > >> > > > been updated. > >> > > > > >> > > > >It will be great to add this in the FLIP so that reviewers can > >> > > understand > >> > > > how the source parallelisms are computed and how the algorithm > works > >> > > > end-to-end. > >> > > > > >> > > > I've updated the FLIP page to add more details on how the > >> backlog-based > >> > > > scaling works (2). > >> > > > > >> > > > >> > > The algorithm is much more informative now. The algorithm currently > >> uses > >> > > "Estimated time for rescale" to derive new source parallelism. Could > >> we > >> > > also specify in the FLIP how this value is derived? > >> > > > >> > > The algorithm currently uses pendingRecords to derive source > >> parallelism. > >> > > It is an optional metric and KafkaSource currently reports this > >> metric. > >> > So > >> > > it means that only the proposed algorithm currently only works when > >> all > >> > > sources of the job are KafkaSource, right? > >> > > > >> > > This issue considerably limits the applicability of this FLIP. Do > you > >> > think > >> > > most (if not all) streaming source will report this metric? > >> > Alternatively, > >> > > any chance we can have a fallback solution to evaluate the source > >> > > parallelism based on e.g. cpu or idle ratio for cases where this > >> metric > >> > is > >> > > not available? > >> > > > >> > > > >> > > > >These metrics and configs are public API and need to be stable > >> across > >> > > > minor versions, could we document them before finalizing the FLIP? > >> > > > > >> > > > Metrics and config changes are not strictly part of the public API > >> but > >> > > > Gyula has added a section. > >> > > > > >> > > > >> > > Hmm... if metrics are not public API, then it might happen that we > >> change > >> > > the mbean path in a minor release and break users' monitoring tool. > >> > > Similarly, we might change configs in a minor release that break > >> user's > >> > job > >> > > behavior. We probably want to avoid these breaking changes in minor > >> > > releases. > >> > > > >> > > It is documented here > >> > > < > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > >> > > > > >> > > that > >> > > "Exposed monitoring information" and "Configuration settings" are > >> public > >> > > interfaces of the project. > >> > > > >> > > Maybe we should also specify the metric here so that users can > safely > >> > setup > >> > > dashboards and tools to track how the autopilot is working, similar > to > >> > how > >> > > metrics are documented in FLIP-33 > >> > > < > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >> > > > > >> > > ? > >> > > > >> > > > >> > > > -Max > >> > > > > >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > > >> > > > > Hi Maximilian, > >> > > > > > >> > > > > It seems that the following comments from the previous > discussions > >> > have > >> > > > not > >> > > > > been addressed yet. Any chance we can have them addressed before > >> > > starting > >> > > > > the voting thread? > >> > > > > > >> > > > > Thanks, > >> > > > > Dong > >> > > > > > >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com > > > >> > > wrote: > >> > > > > > >> > > > > > Hi Dong! > >> > > > > > > >> > > > > > Let me try to answer the questions :) > >> > > > > > > >> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures > the > >> > time > >> > > > > > spent in the main record processing loop for an operator if I > >> > > > > > understand correctly. This includes IO operations too. > >> > > > > > > >> > > > > > 2: We should add this to the FLIP I agree. It would be a > >> Duration > >> > > > config > >> > > > > > with the expected catch up time after rescaling (let's say 5 > >> > > minutes). > >> > > > It > >> > > > > > could be computed based on the current data rate and the > >> calculated > >> > > max > >> > > > > > processing rate after the rescale. > >> > > > > > > >> > > > > > >> > > > > It will be great to add this in the FLIP so that reviewers can > >> > > understand > >> > > > > how the source parallelisms are computed and how the algorithm > >> works > >> > > > > end-to-end. > >> > > > > > >> > > > > > >> > > > > > 3: In the current proposal we don't have per operator configs. > >> > Target > >> > > > > > utilization would apply to all operators uniformly. > >> > > > > > > >> > > > > > 4: It should be configurable, yes. > >> > > > > > > >> > > > > > >> > > > > Since this config is a public API, could we update the FLIP > >> > accordingly > >> > > > to > >> > > > > provide this config? > >> > > > > > >> > > > > > >> > > > > > > >> > > > > > 5,6: The names haven't been finalized but I think these are > >> minor > >> > > > > details. > >> > > > > > We could add concrete names to the FLIP :) > >> > > > > > > >> > > > > > >> > > > > These metrics and configs are public API and need to be stable > >> across > >> > > > minor > >> > > > > versions, could we document them before finalizing the FLIP? > >> > > > > > >> > > > > > >> > > > > > > >> > > > > > Cheers, > >> > > > > > Gyula > >> > > > > > > >> > > > > > > >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> > >> > wrote: > >> > > > > > > >> > > > > >> Hi Max, > >> > > > > >> > >> > > > > >> Thank you for the proposal. The proposal tackles a very > >> important > >> > > > issue > >> > > > > >> for Flink users and the design looks promising overall! > >> > > > > >> > >> > > > > >> I have some questions to better understand the proposed > public > >> > > > > interfaces > >> > > > > >> and the algorithm. > >> > > > > >> > >> > > > > >> 1) The proposal seems to assume that the operator's > >> > > > busyTimeMsPerSecond > >> > > > > >> could reach 1 sec. I believe this is mostly true for > cpu-bound > >> > > > > operators. > >> > > > > >> Could you confirm that this can also be true for io-bound > >> > operators > >> > > > > such as > >> > > > > >> sinks? For example, suppose a Kafka Sink subtask has reached > >> I/O > >> > > > > bottleneck > >> > > > > >> when flushing data out to the Kafka clusters, will > >> > > busyTimeMsPerSecond > >> > > > > >> reach 1 sec? > >> > > > > >> > >> > > > > >> 2) It is said that "users can configure a maximum time to > fully > >> > > > process > >> > > > > >> the backlog". The configuration section does not seem to > >> provide > >> > > this > >> > > > > >> config. Could you specify this? And any chance this proposal > >> can > >> > > > provide > >> > > > > >> the formula for calculating the new processing rate? > >> > > > > >> > >> > > > > >> 3) How are users expected to specify the per-operator configs > >> > (e.g. > >> > > > > >> target utilization)? For example, should users specify it > >> > > > > programmatically > >> > > > > >> in a DataStream/Table/SQL API? > >> > > > > >> > >> > > > > >> 4) How often will the Flink Kubernetes operator query metrics > >> from > >> > > > > >> JobManager? Is this configurable? > >> > > > > >> > >> > > > > >> 5) Could you specify the config name and default value for > the > >> > > > proposed > >> > > > > >> configs? > >> > > > > >> > >> > > > > >> 6) Could you add the name/mbean/type for the proposed > metrics? > >> > > > > >> > >> > > > > >> > >> > > > > >> Cheers, > >> > > > > >> Dong > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > > >