Hi Max, Thanks for the FLIP!
I’ve been curious about one one point. I can imagine some good reasons for it but wonder what you have in mind. What’s the reason to add auto scaling to the Operator instead of to the JobManager? It seems like adding that capability to the JobManager would be a bigger project, but it also would create some interesting opportunities. This is certainly not a suggestion, just a question. Thanks! John On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote: > Thanks for your comments @Dong and @Chen. It is true that not all the > details are contained in the FLIP. The document is meant as a general > design concept. > > As for the rescaling time, this is going to be a configurable setting for > now but it is foreseeable that we will provide auto-tuning of this > configuration value by observing the job restart time. Same goes for the > scaling decision itself which can learn from previous decisions. But we > want to keep it simple for the first version. > > For sources that do not support the pendingRecords metric, we are planning > to either give the user the choice to set a manual target rate, or scale it > purely based on its utilization as reported via busyTimeMsPerSecond. In > case of legacy sources, we will skip scaling these branches entirely > because they support neither of these metrics. > > -Max > > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels <m...@apache.org> wrote: > >> >Do we think the scaler could be a plugin or hard coded ? >> >> +1 For pluggable scaling logic. >> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin <qinnc...@gmail.com> wrote: >> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>> > Hi Chen! >>> > >>> > I think in the long term it makes sense to provide some pluggable >>> > mechanisms but it's not completely trivial where exactly you would plug >>> in >>> > your custom logic at this point. >>> > >>> sounds good, more specifically would be great if it can accept input >>> features >>> (including previous scaling decisions) and output decisions. >>> Folks might keep their own secret sauce and avoid patching oss fork. >>> >>> > >>> > In any case the problems you mentioned should be solved robustly by the >>> > algorithm itself without any customization: >>> > - We need to be able to detect ineffective scaling decisions, let\s >>> say we >>> > scaled up (expecting better throughput with a higher parallelism) but we >>> > did not get a better processing capacity (this would be the external >>> > service bottleneck) >>> > >>> sounds good, so we would at least try restart job once (optimistic path) >>> as >>> design choice. >>> >>> > - We are evaluating metrics in windows, and we have some flexible >>> > boundaries to avoid scaling on minor load spikes >>> > >>> yes, would be great if user can feed in throughput changes over different >>> time buckets (last 10s, 30s, 1 min,5 mins) as input features >>> >>> > >>> > Regards, >>> > Gyula >>> > >>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin <qinnc...@gmail.com> wrote: >>> > >>> > > Hi Gyula, >>> > > >>> > > Do we think the scaler could be a plugin or hard coded ? >>> > > We observed some cases scaler can't address (e.g async io dependency >>> > > service degradation or small spike that doesn't worth restarting job) >>> > > >>> > > Thanks, >>> > > Chen >>> > > >>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra <gyula.f...@gmail.com> >>> wrote: >>> > > >>> > > > Hi Dong! >>> > > > >>> > > > Could you please confirm that your main concerns have been >>> addressed? >>> > > > >>> > > > Some other minor details that might not have been fully clarified: >>> > > > - The prototype has been validated on some production workloads yes >>> > > > - We are only planning to use metrics that are generally available >>> and >>> > > are >>> > > > previously accepted to be standardized connector metrics (not Kafka >>> > > > specific). This is actually specified in the FLIP >>> > > > - Even if some metrics (such as pendingRecords) are not accessible >>> the >>> > > > scaling algorithm works and can be used. For source scaling based on >>> > > > utilization alone we still need some trivial modifications on the >>> > > > implementation side. >>> > > > >>> > > > Cheers, >>> > > > Gyula >>> > > > >>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra <gyula.f...@gmail.com> >>> > wrote: >>> > > > >>> > > > > Hi Dong! >>> > > > > >>> > > > > This is not an experimental feature proposal. The implementation >>> of >>> > the >>> > > > > prototype is still in an experimental phase but by the time the >>> FLIP, >>> > > > > initial prototype and review is done, this should be in a good >>> stable >>> > > > first >>> > > > > version. >>> > > > > This proposal is pretty general as autoscalers/tuners get as far >>> as I >>> > > > > understand and there is no history of any alternative effort that >>> > even >>> > > > > comes close to the applicability of this solution. >>> > > > > >>> > > > > Any large features that were added to Flink in the past have gone >>> > > through >>> > > > > several iterations over the years and the APIs have evolved as >>> they >>> > > > matured. >>> > > > > Something like the autoscaler can only be successful if there is >>> > enough >>> > > > > user exposure and feedback to make it good, putting it in an >>> external >>> > > > repo >>> > > > > will not get us anywhere. >>> > > > > >>> > > > > We have a prototype implementation ready that works well and it is >>> > more >>> > > > or >>> > > > > less feature complete. We proposed this FLIP based on something >>> that >>> > we >>> > > > see >>> > > > > as a working solution, please do not underestimate the effort that >>> > went >>> > > > > into this proposal and the validation of the ideas. So in this >>> sense >>> > > our >>> > > > > approach here is the same as with the Table Store and Kubernetes >>> > > Operator >>> > > > > and other big components of the past. On the other hand it's >>> > impossible >>> > > > to >>> > > > > sufficiently explain all the technical depth/implementation >>> details >>> > of >>> > > > such >>> > > > > complex components in FLIPs to 100%, I feel we have a good >>> overview >>> > of >>> > > > the >>> > > > > algorithm in the FLIP and the implementation should cover all >>> > remaining >>> > > > > questions. We will have an extended code review phase following >>> the >>> > > FLIP >>> > > > > vote before this make it into the project. >>> > > > > >>> > > > > I understand your concern regarding the stability of Flink >>> Kubernetes >>> > > > > Operator config and metric names. We have decided to not provide >>> > > > guarantees >>> > > > > there yet but if you feel that it's time for the operator to >>> support >>> > > such >>> > > > > guarantees please open a separate discussion on that topic, I >>> don't >>> > > want >>> > > > to >>> > > > > mix the two problems here. >>> > > > > >>> > > > > Regards, >>> > > > > Gyula >>> > > > > >>> > > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin <lindon...@gmail.com> >>> > wrote: >>> > > > > >>> > > > >> Hi Gyula, >>> > > > >> >>> > > > >> If I understand correctly, this autopilot proposal is an >>> > experimental >>> > > > >> feature and its configs/metrics are not mature enough to provide >>> > > > backward >>> > > > >> compatibility yet. And the proposal provides high-level ideas of >>> the >>> > > > >> algorithm but it is probably too complicated to explain it >>> > end-to-end. >>> > > > >> >>> > > > >> On the one hand, I do agree that having an auto-tuning prototype, >>> > even >>> > > > if >>> > > > >> not mature, is better than nothing for Flink users. On the other >>> > > hand, I >>> > > > >> am >>> > > > >> concerned that this FLIP seems a bit too experimental, and >>> starting >>> > > with >>> > > > >> an >>> > > > >> immature design might make it harder for us to reach a >>> > > production-ready >>> > > > >> and >>> > > > >> generally applicable auto-tuner in the future. And introducing >>> too >>> > > > >> backward >>> > > > >> incompatible changes generally hurts users' trust in the Flink >>> > > project. >>> > > > >> >>> > > > >> One alternative might be to develop and experiment with this >>> feature >>> > > in >>> > > > a >>> > > > >> non-Flink repo. You can iterate fast without worrying about >>> > typically >>> > > > >> backward compatibility requirement as required for most Flink >>> public >>> > > > >> features. And once the feature is reasonably evaluated and mature >>> > > > enough, >>> > > > >> it will be much easier to explain the design and address all the >>> > > issues >>> > > > >> mentioned above. For example, Jingsong implemented a Flink Table >>> > Store >>> > > > >> prototype >>> > > > >> < >>> https://github.com/JingsongLi/flink/tree/table_storage/flink-table >>> > > >>> > > > >> before >>> > > > >> proposing FLIP-188 in this thread >>> > > > >> < >>> https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064>. >>> > > > >> >>> > > > >> I don't intend to block your progress. Just my two cents. It >>> will be >>> > > > great >>> > > > >> to hear more from other developers (e.g. in the voting thread). >>> > > > >> >>> > > > >> Thanks, >>> > > > >> Dong >>> > > > >> >>> > > > >> >>> > > > >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra <gyula.f...@gmail.com >>> > >>> > > > wrote: >>> > > > >> >>> > > > >> > Hi Dong, >>> > > > >> > >>> > > > >> > Let me address your comments. >>> > > > >> > >>> > > > >> > Time for scale / backlog processing time derivation: >>> > > > >> > We can add some more details to the Flip but at this point the >>> > > > >> > implementation is actually much simpler than the algorithm to >>> > > describe >>> > > > >> it. >>> > > > >> > I would not like to add more equations etc because it just >>> > > > >> overcomplicates >>> > > > >> > something relatively simple in practice. >>> > > > >> > >>> > > > >> > In a nutshell: Time to recover == lag / >>> > > > processing-rate-after-scaleup. >>> > > > >> > It's fairly easy to see where this is going, but best to see in >>> > > code. >>> > > > >> > >>> > > > >> > Using pendingRecords and alternative mechanisms: >>> > > > >> > True that the current algorithm relies on pending records to >>> > > > effectively >>> > > > >> > compute the target source processing rates and therefore scale >>> > > > sources. >>> > > > >> > This is available for Kafka which is by far the most common >>> > > streaming >>> > > > >> > source and is used by the majority of streaming applications >>> > > > currently. >>> > > > >> > It would be very easy to add alternative purely utilization >>> based >>> > > > >> scaling >>> > > > >> > to the sources. We can start with the current proposal and add >>> > this >>> > > > >> along >>> > > > >> > the way before the first version. >>> > > > >> > >>> > > > >> > Metrics, Configs and Public API: >>> > > > >> > The autoscaler feature is proposed for the Flink Kubernetes >>> > Operator >>> > > > >> which >>> > > > >> > does not have the same API/config maturity and thus does not >>> > provide >>> > > > the >>> > > > >> > same guarantees. >>> > > > >> > We currently support backward compatibilty for the CRD itself >>> and >>> > > not >>> > > > >> the >>> > > > >> > configs or metrics. This does not mean that we do not aim to >>> do so >>> > > but >>> > > > >> at >>> > > > >> > this stage we still have to clean up the details of the newly >>> > added >>> > > > >> > components. In practice this means that if we manage to get the >>> > > > metrics >>> > > > >> / >>> > > > >> > configs right at the first try we will keep them and provide >>> > > > >> compatibility, >>> > > > >> > but if we feel that we missed something or we don't need >>> something >>> > > we >>> > > > >> can >>> > > > >> > still remove it. It's a more pragmatic approach for such a >>> > component >>> > > > >> that >>> > > > >> > is likely to evolve than setting everything in stone >>> immediately. >>> > > > >> > >>> > > > >> > Cheers, >>> > > > >> > Gyula >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin <lindon...@gmail.com> >>> > > wrote: >>> > > > >> > >>> > > > >> > > Thanks for the update! Please see comments inline. >>> > > > >> > > >>> > > > >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels < >>> > > m...@apache.org >>> > > > > >>> > > > >> > > wrote: >>> > > > >> > > >>> > > > >> > > > Of course! Let me know if your concerns are addressed. The >>> > wiki >>> > > > page >>> > > > >> > has >>> > > > >> > > > been updated. >>> > > > >> > > > >>> > > > >> > > > >It will be great to add this in the FLIP so that reviewers >>> > can >>> > > > >> > > understand >>> > > > >> > > > how the source parallelisms are computed and how the >>> algorithm >>> > > > works >>> > > > >> > > > end-to-end. >>> > > > >> > > > >>> > > > >> > > > I've updated the FLIP page to add more details on how the >>> > > > >> backlog-based >>> > > > >> > > > scaling works (2). >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > > The algorithm is much more informative now. The algorithm >>> > > currently >>> > > > >> uses >>> > > > >> > > "Estimated time for rescale" to derive new source >>> parallelism. >>> > > Could >>> > > > >> we >>> > > > >> > > also specify in the FLIP how this value is derived? >>> > > > >> > > >>> > > > >> > > The algorithm currently uses pendingRecords to derive source >>> > > > >> parallelism. >>> > > > >> > > It is an optional metric and KafkaSource currently reports >>> this >>> > > > >> metric. >>> > > > >> > So >>> > > > >> > > it means that only the proposed algorithm currently only >>> works >>> > > when >>> > > > >> all >>> > > > >> > > sources of the job are KafkaSource, right? >>> > > > >> > > >>> > > > >> > > This issue considerably limits the applicability of this >>> FLIP. >>> > Do >>> > > > you >>> > > > >> > think >>> > > > >> > > most (if not all) streaming source will report this metric? >>> > > > >> > Alternatively, >>> > > > >> > > any chance we can have a fallback solution to evaluate the >>> > source >>> > > > >> > > parallelism based on e.g. cpu or idle ratio for cases where >>> this >>> > > > >> metric >>> > > > >> > is >>> > > > >> > > not available? >>> > > > >> > > >>> > > > >> > > >>> > > > >> > > > >These metrics and configs are public API and need to be >>> > stable >>> > > > >> across >>> > > > >> > > > minor versions, could we document them before finalizing >>> the >>> > > FLIP? >>> > > > >> > > > >>> > > > >> > > > Metrics and config changes are not strictly part of the >>> public >>> > > API >>> > > > >> but >>> > > > >> > > > Gyula has added a section. >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > > Hmm... if metrics are not public API, then it might happen >>> that >>> > we >>> > > > >> change >>> > > > >> > > the mbean path in a minor release and break users' monitoring >>> > > tool. >>> > > > >> > > Similarly, we might change configs in a minor release that >>> break >>> > > > >> user's >>> > > > >> > job >>> > > > >> > > behavior. We probably want to avoid these breaking changes in >>> > > minor >>> > > > >> > > releases. >>> > > > >> > > >>> > > > >> > > It is documented here >>> > > > >> > > < >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >>> > > > >> > > > >>> > > > >> > > that >>> > > > >> > > "Exposed monitoring information" and "Configuration settings" >>> > are >>> > > > >> public >>> > > > >> > > interfaces of the project. >>> > > > >> > > >>> > > > >> > > Maybe we should also specify the metric here so that users >>> can >>> > > > safely >>> > > > >> > setup >>> > > > >> > > dashboards and tools to track how the autopilot is working, >>> > > similar >>> > > > to >>> > > > >> > how >>> > > > >> > > metrics are documented in FLIP-33 >>> > > > >> > > < >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>> > > > >> > > > >>> > > > >> > > ? >>> > > > >> > > >>> > > > >> > > >>> > > > >> > > > -Max >>> > > > >> > > > >>> > > > >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin < >>> lindon...@gmail.com >>> > > >>> > > > >> wrote: >>> > > > >> > > > >>> > > > >> > > > > Hi Maximilian, >>> > > > >> > > > > >>> > > > >> > > > > It seems that the following comments from the previous >>> > > > discussions >>> > > > >> > have >>> > > > >> > > > not >>> > > > >> > > > > been addressed yet. Any chance we can have them addressed >>> > > before >>> > > > >> > > starting >>> > > > >> > > > > the voting thread? >>> > > > >> > > > > >>> > > > >> > > > > Thanks, >>> > > > >> > > > > Dong >>> > > > >> > > > > >>> > > > >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra < >>> > > gyula.f...@gmail.com >>> > > > > >>> > > > >> > > wrote: >>> > > > >> > > > > >>> > > > >> > > > > > Hi Dong! >>> > > > >> > > > > > >>> > > > >> > > > > > Let me try to answer the questions :) >>> > > > >> > > > > > >>> > > > >> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, it >>> > measures >>> > > > the >>> > > > >> > time >>> > > > >> > > > > > spent in the main record processing loop for an >>> operator >>> > if >>> > > I >>> > > > >> > > > > > understand correctly. This includes IO operations too. >>> > > > >> > > > > > >>> > > > >> > > > > > 2: We should add this to the FLIP I agree. It would be >>> a >>> > > > >> Duration >>> > > > >> > > > config >>> > > > >> > > > > > with the expected catch up time after rescaling (let's >>> > say 5 >>> > > > >> > > minutes). >>> > > > >> > > > It >>> > > > >> > > > > > could be computed based on the current data rate and >>> the >>> > > > >> calculated >>> > > > >> > > max >>> > > > >> > > > > > processing rate after the rescale. >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > > It will be great to add this in the FLIP so that >>> reviewers >>> > can >>> > > > >> > > understand >>> > > > >> > > > > how the source parallelisms are computed and how the >>> > algorithm >>> > > > >> works >>> > > > >> > > > > end-to-end. >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > > > 3: In the current proposal we don't have per operator >>> > > configs. >>> > > > >> > Target >>> > > > >> > > > > > utilization would apply to all operators uniformly. >>> > > > >> > > > > > >>> > > > >> > > > > > 4: It should be configurable, yes. >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > > Since this config is a public API, could we update the >>> FLIP >>> > > > >> > accordingly >>> > > > >> > > > to >>> > > > >> > > > > provide this config? >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > 5,6: The names haven't been finalized but I think these >>> > are >>> > > > >> minor >>> > > > >> > > > > details. >>> > > > >> > > > > > We could add concrete names to the FLIP :) >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > > These metrics and configs are public API and need to be >>> > stable >>> > > > >> across >>> > > > >> > > > minor >>> > > > >> > > > > versions, could we document them before finalizing the >>> FLIP? >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > Cheers, >>> > > > >> > > > > > Gyula >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin < >>> > > lindon...@gmail.com> >>> > > > >> > wrote: >>> > > > >> > > > > > >>> > > > >> > > > > >> Hi Max, >>> > > > >> > > > > >> >>> > > > >> > > > > >> Thank you for the proposal. The proposal tackles a >>> very >>> > > > >> important >>> > > > >> > > > issue >>> > > > >> > > > > >> for Flink users and the design looks promising >>> overall! >>> > > > >> > > > > >> >>> > > > >> > > > > >> I have some questions to better understand the >>> proposed >>> > > > public >>> > > > >> > > > > interfaces >>> > > > >> > > > > >> and the algorithm. >>> > > > >> > > > > >> >>> > > > >> > > > > >> 1) The proposal seems to assume that the operator's >>> > > > >> > > > busyTimeMsPerSecond >>> > > > >> > > > > >> could reach 1 sec. I believe this is mostly true for >>> > > > cpu-bound >>> > > > >> > > > > operators. >>> > > > >> > > > > >> Could you confirm that this can also be true for >>> io-bound >>> > > > >> > operators >>> > > > >> > > > > such as >>> > > > >> > > > > >> sinks? For example, suppose a Kafka Sink subtask has >>> > > reached >>> > > > >> I/O >>> > > > >> > > > > bottleneck >>> > > > >> > > > > >> when flushing data out to the Kafka clusters, will >>> > > > >> > > busyTimeMsPerSecond >>> > > > >> > > > > >> reach 1 sec? >>> > > > >> > > > > >> >>> > > > >> > > > > >> 2) It is said that "users can configure a maximum >>> time to >>> > > > fully >>> > > > >> > > > process >>> > > > >> > > > > >> the backlog". The configuration section does not seem >>> to >>> > > > >> provide >>> > > > >> > > this >>> > > > >> > > > > >> config. Could you specify this? And any chance this >>> > > proposal >>> > > > >> can >>> > > > >> > > > provide >>> > > > >> > > > > >> the formula for calculating the new processing rate? >>> > > > >> > > > > >> >>> > > > >> > > > > >> 3) How are users expected to specify the per-operator >>> > > configs >>> > > > >> > (e.g. >>> > > > >> > > > > >> target utilization)? For example, should users >>> specify it >>> > > > >> > > > > programmatically >>> > > > >> > > > > >> in a DataStream/Table/SQL API? >>> > > > >> > > > > >> >>> > > > >> > > > > >> 4) How often will the Flink Kubernetes operator query >>> > > metrics >>> > > > >> from >>> > > > >> > > > > >> JobManager? Is this configurable? >>> > > > >> > > > > >> >>> > > > >> > > > > >> 5) Could you specify the config name and default value >>> > for >>> > > > the >>> > > > >> > > > proposed >>> > > > >> > > > > >> configs? >>> > > > >> > > > > >> >>> > > > >> > > > > >> 6) Could you add the name/mbean/type for the proposed >>> > > > metrics? >>> > > > >> > > > > >> >>> > > > >> > > > > >> >>> > > > >> > > > > >> Cheers, >>> > > > >> > > > > >> Dong >>> > > > >> > > > > >> >>> > > > >> > > > > >> >>> > > > >> > > > > >> >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >>