>Do we think the scaler could be a plugin or hard coded ? +1 For pluggable scaling logic.
On Mon, Nov 21, 2022 at 3:38 AM Chen Qin <qinnc...@gmail.com> wrote: > On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hi Chen! > > > > I think in the long term it makes sense to provide some pluggable > > mechanisms but it's not completely trivial where exactly you would plug > in > > your custom logic at this point. > > > sounds good, more specifically would be great if it can accept input > features > (including previous scaling decisions) and output decisions. > Folks might keep their own secret sauce and avoid patching oss fork. > > > > > In any case the problems you mentioned should be solved robustly by the > > algorithm itself without any customization: > > - We need to be able to detect ineffective scaling decisions, let\s say > we > > scaled up (expecting better throughput with a higher parallelism) but we > > did not get a better processing capacity (this would be the external > > service bottleneck) > > > sounds good, so we would at least try restart job once (optimistic path) as > design choice. > > > - We are evaluating metrics in windows, and we have some flexible > > boundaries to avoid scaling on minor load spikes > > > yes, would be great if user can feed in throughput changes over different > time buckets (last 10s, 30s, 1 min,5 mins) as input features > > > > > Regards, > > Gyula > > > > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin <qinnc...@gmail.com> wrote: > > > > > Hi Gyula, > > > > > > Do we think the scaler could be a plugin or hard coded ? > > > We observed some cases scaler can't address (e.g async io dependency > > > service degradation or small spike that doesn't worth restarting job) > > > > > > Thanks, > > > Chen > > > > > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra <gyula.f...@gmail.com> > wrote: > > > > > > > Hi Dong! > > > > > > > > Could you please confirm that your main concerns have been addressed? > > > > > > > > Some other minor details that might not have been fully clarified: > > > > - The prototype has been validated on some production workloads yes > > > > - We are only planning to use metrics that are generally available > and > > > are > > > > previously accepted to be standardized connector metrics (not Kafka > > > > specific). This is actually specified in the FLIP > > > > - Even if some metrics (such as pendingRecords) are not accessible > the > > > > scaling algorithm works and can be used. For source scaling based on > > > > utilization alone we still need some trivial modifications on the > > > > implementation side. > > > > > > > > Cheers, > > > > Gyula > > > > > > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra <gyula.f...@gmail.com> > > wrote: > > > > > > > > > Hi Dong! > > > > > > > > > > This is not an experimental feature proposal. The implementation of > > the > > > > > prototype is still in an experimental phase but by the time the > FLIP, > > > > > initial prototype and review is done, this should be in a good > stable > > > > first > > > > > version. > > > > > This proposal is pretty general as autoscalers/tuners get as far > as I > > > > > understand and there is no history of any alternative effort that > > even > > > > > comes close to the applicability of this solution. > > > > > > > > > > Any large features that were added to Flink in the past have gone > > > through > > > > > several iterations over the years and the APIs have evolved as they > > > > matured. > > > > > Something like the autoscaler can only be successful if there is > > enough > > > > > user exposure and feedback to make it good, putting it in an > external > > > > repo > > > > > will not get us anywhere. > > > > > > > > > > We have a prototype implementation ready that works well and it is > > more > > > > or > > > > > less feature complete. We proposed this FLIP based on something > that > > we > > > > see > > > > > as a working solution, please do not underestimate the effort that > > went > > > > > into this proposal and the validation of the ideas. So in this > sense > > > our > > > > > approach here is the same as with the Table Store and Kubernetes > > > Operator > > > > > and other big components of the past. On the other hand it's > > impossible > > > > to > > > > > sufficiently explain all the technical depth/implementation details > > of > > > > such > > > > > complex components in FLIPs to 100%, I feel we have a good overview > > of > > > > the > > > > > algorithm in the FLIP and the implementation should cover all > > remaining > > > > > questions. We will have an extended code review phase following the > > > FLIP > > > > > vote before this make it into the project. > > > > > > > > > > I understand your concern regarding the stability of Flink > Kubernetes > > > > > Operator config and metric names. We have decided to not provide > > > > guarantees > > > > > there yet but if you feel that it's time for the operator to > support > > > such > > > > > guarantees please open a separate discussion on that topic, I don't > > > want > > > > to > > > > > mix the two problems here. > > > > > > > > > > Regards, > > > > > Gyula > > > > > > > > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > >> Hi Gyula, > > > > >> > > > > >> If I understand correctly, this autopilot proposal is an > > experimental > > > > >> feature and its configs/metrics are not mature enough to provide > > > > backward > > > > >> compatibility yet. And the proposal provides high-level ideas of > the > > > > >> algorithm but it is probably too complicated to explain it > > end-to-end. > > > > >> > > > > >> On the one hand, I do agree that having an auto-tuning prototype, > > even > > > > if > > > > >> not mature, is better than nothing for Flink users. On the other > > > hand, I > > > > >> am > > > > >> concerned that this FLIP seems a bit too experimental, and > starting > > > with > > > > >> an > > > > >> immature design might make it harder for us to reach a > > > production-ready > > > > >> and > > > > >> generally applicable auto-tuner in the future. And introducing too > > > > >> backward > > > > >> incompatible changes generally hurts users' trust in the Flink > > > project. > > > > >> > > > > >> One alternative might be to develop and experiment with this > feature > > > in > > > > a > > > > >> non-Flink repo. You can iterate fast without worrying about > > typically > > > > >> backward compatibility requirement as required for most Flink > public > > > > >> features. And once the feature is reasonably evaluated and mature > > > > enough, > > > > >> it will be much easier to explain the design and address all the > > > issues > > > > >> mentioned above. For example, Jingsong implemented a Flink Table > > Store > > > > >> prototype > > > > >> < > https://github.com/JingsongLi/flink/tree/table_storage/flink-table > > > > > > > >> before > > > > >> proposing FLIP-188 in this thread > > > > >> <https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064 > >. > > > > >> > > > > >> I don't intend to block your progress. Just my two cents. It will > be > > > > great > > > > >> to hear more from other developers (e.g. in the voting thread). > > > > >> > > > > >> Thanks, > > > > >> Dong > > > > >> > > > > >> > > > > >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra <gyula.f...@gmail.com> > > > > wrote: > > > > >> > > > > >> > Hi Dong, > > > > >> > > > > > >> > Let me address your comments. > > > > >> > > > > > >> > Time for scale / backlog processing time derivation: > > > > >> > We can add some more details to the Flip but at this point the > > > > >> > implementation is actually much simpler than the algorithm to > > > describe > > > > >> it. > > > > >> > I would not like to add more equations etc because it just > > > > >> overcomplicates > > > > >> > something relatively simple in practice. > > > > >> > > > > > >> > In a nutshell: Time to recover == lag / > > > > processing-rate-after-scaleup. > > > > >> > It's fairly easy to see where this is going, but best to see in > > > code. > > > > >> > > > > > >> > Using pendingRecords and alternative mechanisms: > > > > >> > True that the current algorithm relies on pending records to > > > > effectively > > > > >> > compute the target source processing rates and therefore scale > > > > sources. > > > > >> > This is available for Kafka which is by far the most common > > > streaming > > > > >> > source and is used by the majority of streaming applications > > > > currently. > > > > >> > It would be very easy to add alternative purely utilization > based > > > > >> scaling > > > > >> > to the sources. We can start with the current proposal and add > > this > > > > >> along > > > > >> > the way before the first version. > > > > >> > > > > > >> > Metrics, Configs and Public API: > > > > >> > The autoscaler feature is proposed for the Flink Kubernetes > > Operator > > > > >> which > > > > >> > does not have the same API/config maturity and thus does not > > provide > > > > the > > > > >> > same guarantees. > > > > >> > We currently support backward compatibilty for the CRD itself > and > > > not > > > > >> the > > > > >> > configs or metrics. This does not mean that we do not aim to do > so > > > but > > > > >> at > > > > >> > this stage we still have to clean up the details of the newly > > added > > > > >> > components. In practice this means that if we manage to get the > > > > metrics > > > > >> / > > > > >> > configs right at the first try we will keep them and provide > > > > >> compatibility, > > > > >> > but if we feel that we missed something or we don't need > something > > > we > > > > >> can > > > > >> > still remove it. It's a more pragmatic approach for such a > > component > > > > >> that > > > > >> > is likely to evolve than setting everything in stone > immediately. > > > > >> > > > > > >> > Cheers, > > > > >> > Gyula > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin <lindon...@gmail.com> > > > wrote: > > > > >> > > > > > >> > > Thanks for the update! Please see comments inline. > > > > >> > > > > > > >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels < > > > m...@apache.org > > > > > > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Of course! Let me know if your concerns are addressed. The > > wiki > > > > page > > > > >> > has > > > > >> > > > been updated. > > > > >> > > > > > > > >> > > > >It will be great to add this in the FLIP so that reviewers > > can > > > > >> > > understand > > > > >> > > > how the source parallelisms are computed and how the > algorithm > > > > works > > > > >> > > > end-to-end. > > > > >> > > > > > > > >> > > > I've updated the FLIP page to add more details on how the > > > > >> backlog-based > > > > >> > > > scaling works (2). > > > > >> > > > > > > > >> > > > > > > >> > > The algorithm is much more informative now. The algorithm > > > currently > > > > >> uses > > > > >> > > "Estimated time for rescale" to derive new source parallelism. > > > Could > > > > >> we > > > > >> > > also specify in the FLIP how this value is derived? > > > > >> > > > > > > >> > > The algorithm currently uses pendingRecords to derive source > > > > >> parallelism. > > > > >> > > It is an optional metric and KafkaSource currently reports > this > > > > >> metric. > > > > >> > So > > > > >> > > it means that only the proposed algorithm currently only works > > > when > > > > >> all > > > > >> > > sources of the job are KafkaSource, right? > > > > >> > > > > > > >> > > This issue considerably limits the applicability of this FLIP. > > Do > > > > you > > > > >> > think > > > > >> > > most (if not all) streaming source will report this metric? > > > > >> > Alternatively, > > > > >> > > any chance we can have a fallback solution to evaluate the > > source > > > > >> > > parallelism based on e.g. cpu or idle ratio for cases where > this > > > > >> metric > > > > >> > is > > > > >> > > not available? > > > > >> > > > > > > >> > > > > > > >> > > > >These metrics and configs are public API and need to be > > stable > > > > >> across > > > > >> > > > minor versions, could we document them before finalizing the > > > FLIP? > > > > >> > > > > > > > >> > > > Metrics and config changes are not strictly part of the > public > > > API > > > > >> but > > > > >> > > > Gyula has added a section. > > > > >> > > > > > > > >> > > > > > > >> > > Hmm... if metrics are not public API, then it might happen > that > > we > > > > >> change > > > > >> > > the mbean path in a minor release and break users' monitoring > > > tool. > > > > >> > > Similarly, we might change configs in a minor release that > break > > > > >> user's > > > > >> > job > > > > >> > > behavior. We probably want to avoid these breaking changes in > > > minor > > > > >> > > releases. > > > > >> > > > > > > >> > > It is documented here > > > > >> > > < > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > > > >> > > > > > > > >> > > that > > > > >> > > "Exposed monitoring information" and "Configuration settings" > > are > > > > >> public > > > > >> > > interfaces of the project. > > > > >> > > > > > > >> > > Maybe we should also specify the metric here so that users can > > > > safely > > > > >> > setup > > > > >> > > dashboards and tools to track how the autopilot is working, > > > similar > > > > to > > > > >> > how > > > > >> > > metrics are documented in FLIP-33 > > > > >> > > < > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > >> > > > > > > > >> > > ? > > > > >> > > > > > > >> > > > > > > >> > > > -Max > > > > >> > > > > > > > >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin < > lindon...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > > > > >> > > > > Hi Maximilian, > > > > >> > > > > > > > > >> > > > > It seems that the following comments from the previous > > > > discussions > > > > >> > have > > > > >> > > > not > > > > >> > > > > been addressed yet. Any chance we can have them addressed > > > before > > > > >> > > starting > > > > >> > > > > the voting thread? > > > > >> > > > > > > > > >> > > > > Thanks, > > > > >> > > > > Dong > > > > >> > > > > > > > > >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra < > > > gyula.f...@gmail.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > > >> > > > > > Hi Dong! > > > > >> > > > > > > > > > >> > > > > > Let me try to answer the questions :) > > > > >> > > > > > > > > > >> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, it > > measures > > > > the > > > > >> > time > > > > >> > > > > > spent in the main record processing loop for an operator > > if > > > I > > > > >> > > > > > understand correctly. This includes IO operations too. > > > > >> > > > > > > > > > >> > > > > > 2: We should add this to the FLIP I agree. It would be a > > > > >> Duration > > > > >> > > > config > > > > >> > > > > > with the expected catch up time after rescaling (let's > > say 5 > > > > >> > > minutes). > > > > >> > > > It > > > > >> > > > > > could be computed based on the current data rate and the > > > > >> calculated > > > > >> > > max > > > > >> > > > > > processing rate after the rescale. > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > It will be great to add this in the FLIP so that reviewers > > can > > > > >> > > understand > > > > >> > > > > how the source parallelisms are computed and how the > > algorithm > > > > >> works > > > > >> > > > > end-to-end. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > 3: In the current proposal we don't have per operator > > > configs. > > > > >> > Target > > > > >> > > > > > utilization would apply to all operators uniformly. > > > > >> > > > > > > > > > >> > > > > > 4: It should be configurable, yes. > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > Since this config is a public API, could we update the > FLIP > > > > >> > accordingly > > > > >> > > > to > > > > >> > > > > provide this config? > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > 5,6: The names haven't been finalized but I think these > > are > > > > >> minor > > > > >> > > > > details. > > > > >> > > > > > We could add concrete names to the FLIP :) > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > These metrics and configs are public API and need to be > > stable > > > > >> across > > > > >> > > > minor > > > > >> > > > > versions, could we document them before finalizing the > FLIP? > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > Cheers, > > > > >> > > > > > Gyula > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin < > > > lindon...@gmail.com> > > > > >> > wrote: > > > > >> > > > > > > > > > >> > > > > >> Hi Max, > > > > >> > > > > >> > > > > >> > > > > >> Thank you for the proposal. The proposal tackles a very > > > > >> important > > > > >> > > > issue > > > > >> > > > > >> for Flink users and the design looks promising overall! > > > > >> > > > > >> > > > > >> > > > > >> I have some questions to better understand the proposed > > > > public > > > > >> > > > > interfaces > > > > >> > > > > >> and the algorithm. > > > > >> > > > > >> > > > > >> > > > > >> 1) The proposal seems to assume that the operator's > > > > >> > > > busyTimeMsPerSecond > > > > >> > > > > >> could reach 1 sec. I believe this is mostly true for > > > > cpu-bound > > > > >> > > > > operators. > > > > >> > > > > >> Could you confirm that this can also be true for > io-bound > > > > >> > operators > > > > >> > > > > such as > > > > >> > > > > >> sinks? For example, suppose a Kafka Sink subtask has > > > reached > > > > >> I/O > > > > >> > > > > bottleneck > > > > >> > > > > >> when flushing data out to the Kafka clusters, will > > > > >> > > busyTimeMsPerSecond > > > > >> > > > > >> reach 1 sec? > > > > >> > > > > >> > > > > >> > > > > >> 2) It is said that "users can configure a maximum time > to > > > > fully > > > > >> > > > process > > > > >> > > > > >> the backlog". The configuration section does not seem > to > > > > >> provide > > > > >> > > this > > > > >> > > > > >> config. Could you specify this? And any chance this > > > proposal > > > > >> can > > > > >> > > > provide > > > > >> > > > > >> the formula for calculating the new processing rate? > > > > >> > > > > >> > > > > >> > > > > >> 3) How are users expected to specify the per-operator > > > configs > > > > >> > (e.g. > > > > >> > > > > >> target utilization)? For example, should users specify > it > > > > >> > > > > programmatically > > > > >> > > > > >> in a DataStream/Table/SQL API? > > > > >> > > > > >> > > > > >> > > > > >> 4) How often will the Flink Kubernetes operator query > > > metrics > > > > >> from > > > > >> > > > > >> JobManager? Is this configurable? > > > > >> > > > > >> > > > > >> > > > > >> 5) Could you specify the config name and default value > > for > > > > the > > > > >> > > > proposed > > > > >> > > > > >> configs? > > > > >> > > > > >> > > > > >> > > > > >> 6) Could you add the name/mbean/type for the proposed > > > > metrics? > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> Cheers, > > > > >> > > > > >> Dong > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >