HI max, This is a great initiative and good discussion going on.
We have set up flink cluster using Amazon ECS . So It would be good to design in such a way that we can deploy the autoscaler in a separate docker image which could observe the JM, JOBS and emit outputs that can use to trigger the ECS to add new/delete existing TMs on request. Thanks, Prasanna. On Fri, 25 Nov 2022, 07:39 Rui Fan, <1996fan...@gmail.com> wrote: > Hi Gyula, Max, John! > > Thanks for the great FLIP, it's very useful for flink users. > > > Ideally the autoscaler is a separate process (an outside observer) > > Could we finally use the autoscaler as a outside tool? or run it as a > separate java process? If it's complex, can the part that detects > the job and suggests parallelism be a separate java process? > > Since a large number of Flink jobs are still using Flink on yarn, > this feature is also useful for them. I was wondering if some logs > or advice can be provided if automatic scala is not working for > Flink on yarn. For example: the parallelism suggested by > vertex_1 is 100, and the parallelism suggested by vertex_2 is 150. > > With this information, the flink user can manually set > reasonable parallelism. Or some flink platforms can integrate > this tool and use `pipeline.jobvertex-parallelism-overrides`[1] > to make autoscaler work on Flink on yarn. > > > By adding it to the operator, the autoscaler can potentially work on > > older Flink versions as well > > As I understand, `pipeline.jobvertex-parallelism-overrides`[1] > is supported in Flink 1.17, so old flink versions can only detect, > not auto scala, right? > > [1] https://issues.apache.org/jira/browse/FLINK-29501 > > Best > Rui Fan > > > On Fri, Nov 25, 2022 at 4:54 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hi John! > > > > Thank you for the excellent question. > > > > There are few reasons why we felt that the operator is the right place > for > > this component: > > > > - Ideally the autoscaler is a separate process (an outside observer) , > and > > the jobmanager is very much tied to the lifecycle of the job. The > operator > > is a perfect example of such an external process that lives beyond > > individual jobs. > > - Scaling itself might need some external resource management (for > > standalone clusters) that the jobmanager is not capable of, and the logic > > is already in the operator > > - Adding this to the operator allows us to integrate this fully in the > > lifecycle management of the application. This guarantees that scaling > > decisions do not interfere with upgrades, suspends etc. > > - By adding it to the operator, the autoscaler can potentially work on > > older Flink versions as well > > - The jobmanager is a component designed to handle Flink individual jobs, > > but the autoscaler component needs to work on a higher abstraction layer > to > > be able to integrate with user job upgrades etc. > > > > These are some of the main things that come to my mind :) > > > > Having it in the operator ties this logic to Kubernetes itself but we > feel > > that an autoscaler is mostly relevant in an elastic cloud environment > > anyways. > > > > Cheers, > > Gyula > > > > On Thu, Nov 24, 2022 at 9:40 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Hi Max, > > > > > > Thanks for the FLIP! > > > > > > I’ve been curious about one one point. I can imagine some good reasons > > for > > > it but wonder what you have in mind. What’s the reason to add auto > > scaling > > > to the Operator instead of to the JobManager? > > > > > > It seems like adding that capability to the JobManager would be a > bigger > > > project, but it also would create some interesting opportunities. > > > > > > This is certainly not a suggestion, just a question. > > > > > > Thanks! > > > John > > > > > > On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote: > > > > Thanks for your comments @Dong and @Chen. It is true that not all the > > > > details are contained in the FLIP. The document is meant as a general > > > > design concept. > > > > > > > > As for the rescaling time, this is going to be a configurable setting > > for > > > > now but it is foreseeable that we will provide auto-tuning of this > > > > configuration value by observing the job restart time. Same goes for > > the > > > > scaling decision itself which can learn from previous decisions. But > we > > > > want to keep it simple for the first version. > > > > > > > > For sources that do not support the pendingRecords metric, we are > > > planning > > > > to either give the user the choice to set a manual target rate, or > > scale > > > it > > > > purely based on its utilization as reported via busyTimeMsPerSecond. > In > > > > case of legacy sources, we will skip scaling these branches entirely > > > > because they support neither of these metrics. > > > > > > > > -Max > > > > > > > > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels <m...@apache.org> > > > wrote: > > > > > > > >> >Do we think the scaler could be a plugin or hard coded ? > > > >> > > > >> +1 For pluggable scaling logic. > > > >> > > > >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin <qinnc...@gmail.com> > wrote: > > > >> > > > >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra <gyula.f...@gmail.com> > > > wrote: > > > >>> > > > >>> > Hi Chen! > > > >>> > > > > >>> > I think in the long term it makes sense to provide some pluggable > > > >>> > mechanisms but it's not completely trivial where exactly you > would > > > plug > > > >>> in > > > >>> > your custom logic at this point. > > > >>> > > > > >>> sounds good, more specifically would be great if it can accept > input > > > >>> features > > > >>> (including previous scaling decisions) and output decisions. > > > >>> Folks might keep their own secret sauce and avoid patching oss > fork. > > > >>> > > > >>> > > > > >>> > In any case the problems you mentioned should be solved robustly > by > > > the > > > >>> > algorithm itself without any customization: > > > >>> > - We need to be able to detect ineffective scaling decisions, > > let\s > > > >>> say we > > > >>> > scaled up (expecting better throughput with a higher parallelism) > > > but we > > > >>> > did not get a better processing capacity (this would be the > > external > > > >>> > service bottleneck) > > > >>> > > > > >>> sounds good, so we would at least try restart job once (optimistic > > > path) > > > >>> as > > > >>> design choice. > > > >>> > > > >>> > - We are evaluating metrics in windows, and we have some > flexible > > > >>> > boundaries to avoid scaling on minor load spikes > > > >>> > > > > >>> yes, would be great if user can feed in throughput changes over > > > different > > > >>> time buckets (last 10s, 30s, 1 min,5 mins) as input features > > > >>> > > > >>> > > > > >>> > Regards, > > > >>> > Gyula > > > >>> > > > > >>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin <qinnc...@gmail.com> > > > wrote: > > > >>> > > > > >>> > > Hi Gyula, > > > >>> > > > > > >>> > > Do we think the scaler could be a plugin or hard coded ? > > > >>> > > We observed some cases scaler can't address (e.g async io > > > dependency > > > >>> > > service degradation or small spike that doesn't worth > restarting > > > job) > > > >>> > > > > > >>> > > Thanks, > > > >>> > > Chen > > > >>> > > > > > >>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra < > gyula.f...@gmail.com > > > > > > >>> wrote: > > > >>> > > > > > >>> > > > Hi Dong! > > > >>> > > > > > > >>> > > > Could you please confirm that your main concerns have been > > > >>> addressed? > > > >>> > > > > > > >>> > > > Some other minor details that might not have been fully > > > clarified: > > > >>> > > > - The prototype has been validated on some production > > workloads > > > yes > > > >>> > > > - We are only planning to use metrics that are generally > > > available > > > >>> and > > > >>> > > are > > > >>> > > > previously accepted to be standardized connector metrics (not > > > Kafka > > > >>> > > > specific). This is actually specified in the FLIP > > > >>> > > > - Even if some metrics (such as pendingRecords) are not > > > accessible > > > >>> the > > > >>> > > > scaling algorithm works and can be used. For source scaling > > > based on > > > >>> > > > utilization alone we still need some trivial modifications on > > the > > > >>> > > > implementation side. > > > >>> > > > > > > >>> > > > Cheers, > > > >>> > > > Gyula > > > >>> > > > > > > >>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra < > > gyula.f...@gmail.com > > > > > > > >>> > wrote: > > > >>> > > > > > > >>> > > > > Hi Dong! > > > >>> > > > > > > > >>> > > > > This is not an experimental feature proposal. The > > > implementation > > > >>> of > > > >>> > the > > > >>> > > > > prototype is still in an experimental phase but by the time > > the > > > >>> FLIP, > > > >>> > > > > initial prototype and review is done, this should be in a > > good > > > >>> stable > > > >>> > > > first > > > >>> > > > > version. > > > >>> > > > > This proposal is pretty general as autoscalers/tuners get > as > > > far > > > >>> as I > > > >>> > > > > understand and there is no history of any alternative > effort > > > that > > > >>> > even > > > >>> > > > > comes close to the applicability of this solution. > > > >>> > > > > > > > >>> > > > > Any large features that were added to Flink in the past > have > > > gone > > > >>> > > through > > > >>> > > > > several iterations over the years and the APIs have evolved > > as > > > >>> they > > > >>> > > > matured. > > > >>> > > > > Something like the autoscaler can only be successful if > there > > > is > > > >>> > enough > > > >>> > > > > user exposure and feedback to make it good, putting it in > an > > > >>> external > > > >>> > > > repo > > > >>> > > > > will not get us anywhere. > > > >>> > > > > > > > >>> > > > > We have a prototype implementation ready that works well > and > > > it is > > > >>> > more > > > >>> > > > or > > > >>> > > > > less feature complete. We proposed this FLIP based on > > something > > > >>> that > > > >>> > we > > > >>> > > > see > > > >>> > > > > as a working solution, please do not underestimate the > effort > > > that > > > >>> > went > > > >>> > > > > into this proposal and the validation of the ideas. So in > > this > > > >>> sense > > > >>> > > our > > > >>> > > > > approach here is the same as with the Table Store and > > > Kubernetes > > > >>> > > Operator > > > >>> > > > > and other big components of the past. On the other hand > it's > > > >>> > impossible > > > >>> > > > to > > > >>> > > > > sufficiently explain all the technical depth/implementation > > > >>> details > > > >>> > of > > > >>> > > > such > > > >>> > > > > complex components in FLIPs to 100%, I feel we have a good > > > >>> overview > > > >>> > of > > > >>> > > > the > > > >>> > > > > algorithm in the FLIP and the implementation should cover > all > > > >>> > remaining > > > >>> > > > > questions. We will have an extended code review phase > > following > > > >>> the > > > >>> > > FLIP > > > >>> > > > > vote before this make it into the project. > > > >>> > > > > > > > >>> > > > > I understand your concern regarding the stability of Flink > > > >>> Kubernetes > > > >>> > > > > Operator config and metric names. We have decided to not > > > provide > > > >>> > > > guarantees > > > >>> > > > > there yet but if you feel that it's time for the operator > to > > > >>> support > > > >>> > > such > > > >>> > > > > guarantees please open a separate discussion on that > topic, I > > > >>> don't > > > >>> > > want > > > >>> > > > to > > > >>> > > > > mix the two problems here. > > > >>> > > > > > > > >>> > > > > Regards, > > > >>> > > > > Gyula > > > >>> > > > > > > > >>> > > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin < > > lindon...@gmail.com> > > > >>> > wrote: > > > >>> > > > > > > > >>> > > > >> Hi Gyula, > > > >>> > > > >> > > > >>> > > > >> If I understand correctly, this autopilot proposal is an > > > >>> > experimental > > > >>> > > > >> feature and its configs/metrics are not mature enough to > > > provide > > > >>> > > > backward > > > >>> > > > >> compatibility yet. And the proposal provides high-level > > ideas > > > of > > > >>> the > > > >>> > > > >> algorithm but it is probably too complicated to explain it > > > >>> > end-to-end. > > > >>> > > > >> > > > >>> > > > >> On the one hand, I do agree that having an auto-tuning > > > prototype, > > > >>> > even > > > >>> > > > if > > > >>> > > > >> not mature, is better than nothing for Flink users. On the > > > other > > > >>> > > hand, I > > > >>> > > > >> am > > > >>> > > > >> concerned that this FLIP seems a bit too experimental, and > > > >>> starting > > > >>> > > with > > > >>> > > > >> an > > > >>> > > > >> immature design might make it harder for us to reach a > > > >>> > > production-ready > > > >>> > > > >> and > > > >>> > > > >> generally applicable auto-tuner in the future. And > > introducing > > > >>> too > > > >>> > > > >> backward > > > >>> > > > >> incompatible changes generally hurts users' trust in the > > Flink > > > >>> > > project. > > > >>> > > > >> > > > >>> > > > >> One alternative might be to develop and experiment with > this > > > >>> feature > > > >>> > > in > > > >>> > > > a > > > >>> > > > >> non-Flink repo. You can iterate fast without worrying > about > > > >>> > typically > > > >>> > > > >> backward compatibility requirement as required for most > > Flink > > > >>> public > > > >>> > > > >> features. And once the feature is reasonably evaluated and > > > mature > > > >>> > > > enough, > > > >>> > > > >> it will be much easier to explain the design and address > all > > > the > > > >>> > > issues > > > >>> > > > >> mentioned above. For example, Jingsong implemented a Flink > > > Table > > > >>> > Store > > > >>> > > > >> prototype > > > >>> > > > >> < > > > >>> https://github.com/JingsongLi/flink/tree/table_storage/flink-table > > > >>> > > > > > >>> > > > >> before > > > >>> > > > >> proposing FLIP-188 in this thread > > > >>> > > > >> < > > > >>> https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064>. > > > >>> > > > >> > > > >>> > > > >> I don't intend to block your progress. Just my two cents. > It > > > >>> will be > > > >>> > > > great > > > >>> > > > >> to hear more from other developers (e.g. in the voting > > > thread). > > > >>> > > > >> > > > >>> > > > >> Thanks, > > > >>> > > > >> Dong > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra < > > > gyula.f...@gmail.com > > > >>> > > > > >>> > > > wrote: > > > >>> > > > >> > > > >>> > > > >> > Hi Dong, > > > >>> > > > >> > > > > >>> > > > >> > Let me address your comments. > > > >>> > > > >> > > > > >>> > > > >> > Time for scale / backlog processing time derivation: > > > >>> > > > >> > We can add some more details to the Flip but at this > point > > > the > > > >>> > > > >> > implementation is actually much simpler than the > algorithm > > > to > > > >>> > > describe > > > >>> > > > >> it. > > > >>> > > > >> > I would not like to add more equations etc because it > just > > > >>> > > > >> overcomplicates > > > >>> > > > >> > something relatively simple in practice. > > > >>> > > > >> > > > > >>> > > > >> > In a nutshell: Time to recover == lag / > > > >>> > > > processing-rate-after-scaleup. > > > >>> > > > >> > It's fairly easy to see where this is going, but best to > > > see in > > > >>> > > code. > > > >>> > > > >> > > > > >>> > > > >> > Using pendingRecords and alternative mechanisms: > > > >>> > > > >> > True that the current algorithm relies on pending > records > > to > > > >>> > > > effectively > > > >>> > > > >> > compute the target source processing rates and therefore > > > scale > > > >>> > > > sources. > > > >>> > > > >> > This is available for Kafka which is by far the most > > common > > > >>> > > streaming > > > >>> > > > >> > source and is used by the majority of streaming > > applications > > > >>> > > > currently. > > > >>> > > > >> > It would be very easy to add alternative purely > > utilization > > > >>> based > > > >>> > > > >> scaling > > > >>> > > > >> > to the sources. We can start with the current proposal > and > > > add > > > >>> > this > > > >>> > > > >> along > > > >>> > > > >> > the way before the first version. > > > >>> > > > >> > > > > >>> > > > >> > Metrics, Configs and Public API: > > > >>> > > > >> > The autoscaler feature is proposed for the Flink > > Kubernetes > > > >>> > Operator > > > >>> > > > >> which > > > >>> > > > >> > does not have the same API/config maturity and thus does > > not > > > >>> > provide > > > >>> > > > the > > > >>> > > > >> > same guarantees. > > > >>> > > > >> > We currently support backward compatibilty for the CRD > > > itself > > > >>> and > > > >>> > > not > > > >>> > > > >> the > > > >>> > > > >> > configs or metrics. This does not mean that we do not > aim > > to > > > >>> do so > > > >>> > > but > > > >>> > > > >> at > > > >>> > > > >> > this stage we still have to clean up the details of the > > > newly > > > >>> > added > > > >>> > > > >> > components. In practice this means that if we manage to > > get > > > the > > > >>> > > > metrics > > > >>> > > > >> / > > > >>> > > > >> > configs right at the first try we will keep them and > > provide > > > >>> > > > >> compatibility, > > > >>> > > > >> > but if we feel that we missed something or we don't need > > > >>> something > > > >>> > > we > > > >>> > > > >> can > > > >>> > > > >> > still remove it. It's a more pragmatic approach for > such a > > > >>> > component > > > >>> > > > >> that > > > >>> > > > >> > is likely to evolve than setting everything in stone > > > >>> immediately. > > > >>> > > > >> > > > > >>> > > > >> > Cheers, > > > >>> > > > >> > Gyula > > > >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > > > > >>> > > > >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin < > > > lindon...@gmail.com> > > > >>> > > wrote: > > > >>> > > > >> > > > > >>> > > > >> > > Thanks for the update! Please see comments inline. > > > >>> > > > >> > > > > > >>> > > > >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels < > > > >>> > > m...@apache.org > > > >>> > > > > > > > >>> > > > >> > > wrote: > > > >>> > > > >> > > > > > >>> > > > >> > > > Of course! Let me know if your concerns are > addressed. > > > The > > > >>> > wiki > > > >>> > > > page > > > >>> > > > >> > has > > > >>> > > > >> > > > been updated. > > > >>> > > > >> > > > > > > >>> > > > >> > > > >It will be great to add this in the FLIP so that > > > reviewers > > > >>> > can > > > >>> > > > >> > > understand > > > >>> > > > >> > > > how the source parallelisms are computed and how the > > > >>> algorithm > > > >>> > > > works > > > >>> > > > >> > > > end-to-end. > > > >>> > > > >> > > > > > > >>> > > > >> > > > I've updated the FLIP page to add more details on > how > > > the > > > >>> > > > >> backlog-based > > > >>> > > > >> > > > scaling works (2). > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > The algorithm is much more informative now. The > > algorithm > > > >>> > > currently > > > >>> > > > >> uses > > > >>> > > > >> > > "Estimated time for rescale" to derive new source > > > >>> parallelism. > > > >>> > > Could > > > >>> > > > >> we > > > >>> > > > >> > > also specify in the FLIP how this value is derived? > > > >>> > > > >> > > > > > >>> > > > >> > > The algorithm currently uses pendingRecords to derive > > > source > > > >>> > > > >> parallelism. > > > >>> > > > >> > > It is an optional metric and KafkaSource currently > > reports > > > >>> this > > > >>> > > > >> metric. > > > >>> > > > >> > So > > > >>> > > > >> > > it means that only the proposed algorithm currently > only > > > >>> works > > > >>> > > when > > > >>> > > > >> all > > > >>> > > > >> > > sources of the job are KafkaSource, right? > > > >>> > > > >> > > > > > >>> > > > >> > > This issue considerably limits the applicability of > this > > > >>> FLIP. > > > >>> > Do > > > >>> > > > you > > > >>> > > > >> > think > > > >>> > > > >> > > most (if not all) streaming source will report this > > > metric? > > > >>> > > > >> > Alternatively, > > > >>> > > > >> > > any chance we can have a fallback solution to evaluate > > the > > > >>> > source > > > >>> > > > >> > > parallelism based on e.g. cpu or idle ratio for cases > > > where > > > >>> this > > > >>> > > > >> metric > > > >>> > > > >> > is > > > >>> > > > >> > > not available? > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > >These metrics and configs are public API and need > to > > be > > > >>> > stable > > > >>> > > > >> across > > > >>> > > > >> > > > minor versions, could we document them before > > finalizing > > > >>> the > > > >>> > > FLIP? > > > >>> > > > >> > > > > > > >>> > > > >> > > > Metrics and config changes are not strictly part of > > the > > > >>> public > > > >>> > > API > > > >>> > > > >> but > > > >>> > > > >> > > > Gyula has added a section. > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > Hmm... if metrics are not public API, then it might > > happen > > > >>> that > > > >>> > we > > > >>> > > > >> change > > > >>> > > > >> > > the mbean path in a minor release and break users' > > > monitoring > > > >>> > > tool. > > > >>> > > > >> > > Similarly, we might change configs in a minor release > > that > > > >>> break > > > >>> > > > >> user's > > > >>> > > > >> > job > > > >>> > > > >> > > behavior. We probably want to avoid these breaking > > > changes in > > > >>> > > minor > > > >>> > > > >> > > releases. > > > >>> > > > >> > > > > > >>> > > > >> > > It is documented here > > > >>> > > > >> > > < > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > > >>> > > > >> > > > > > > >>> > > > >> > > that > > > >>> > > > >> > > "Exposed monitoring information" and "Configuration > > > settings" > > > >>> > are > > > >>> > > > >> public > > > >>> > > > >> > > interfaces of the project. > > > >>> > > > >> > > > > > >>> > > > >> > > Maybe we should also specify the metric here so that > > users > > > >>> can > > > >>> > > > safely > > > >>> > > > >> > setup > > > >>> > > > >> > > dashboards and tools to track how the autopilot is > > > working, > > > >>> > > similar > > > >>> > > > to > > > >>> > > > >> > how > > > >>> > > > >> > > metrics are documented in FLIP-33 > > > >>> > > > >> > > < > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > >>> > > > >> > > > > > > >>> > > > >> > > ? > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > -Max > > > >>> > > > >> > > > > > > >>> > > > >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin < > > > >>> lindon...@gmail.com > > > >>> > > > > > >>> > > > >> wrote: > > > >>> > > > >> > > > > > > >>> > > > >> > > > > Hi Maximilian, > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > It seems that the following comments from the > > previous > > > >>> > > > discussions > > > >>> > > > >> > have > > > >>> > > > >> > > > not > > > >>> > > > >> > > > > been addressed yet. Any chance we can have them > > > addressed > > > >>> > > before > > > >>> > > > >> > > starting > > > >>> > > > >> > > > > the voting thread? > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > Thanks, > > > >>> > > > >> > > > > Dong > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra < > > > >>> > > gyula.f...@gmail.com > > > >>> > > > > > > > >>> > > > >> > > wrote: > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > Hi Dong! > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > Let me try to answer the questions :) > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, > > it > > > >>> > measures > > > >>> > > > the > > > >>> > > > >> > time > > > >>> > > > >> > > > > > spent in the main record processing loop for an > > > >>> operator > > > >>> > if > > > >>> > > I > > > >>> > > > >> > > > > > understand correctly. This includes IO > operations > > > too. > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > 2: We should add this to the FLIP I agree. It > > would > > > be > > > >>> a > > > >>> > > > >> Duration > > > >>> > > > >> > > > config > > > >>> > > > >> > > > > > with the expected catch up time after rescaling > > > (let's > > > >>> > say 5 > > > >>> > > > >> > > minutes). > > > >>> > > > >> > > > It > > > >>> > > > >> > > > > > could be computed based on the current data rate > > and > > > >>> the > > > >>> > > > >> calculated > > > >>> > > > >> > > max > > > >>> > > > >> > > > > > processing rate after the rescale. > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > It will be great to add this in the FLIP so that > > > >>> reviewers > > > >>> > can > > > >>> > > > >> > > understand > > > >>> > > > >> > > > > how the source parallelisms are computed and how > the > > > >>> > algorithm > > > >>> > > > >> works > > > >>> > > > >> > > > > end-to-end. > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > 3: In the current proposal we don't have per > > > operator > > > >>> > > configs. > > > >>> > > > >> > Target > > > >>> > > > >> > > > > > utilization would apply to all operators > > uniformly. > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > 4: It should be configurable, yes. > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > Since this config is a public API, could we update > > the > > > >>> FLIP > > > >>> > > > >> > accordingly > > > >>> > > > >> > > > to > > > >>> > > > >> > > > > provide this config? > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > 5,6: The names haven't been finalized but I > think > > > these > > > >>> > are > > > >>> > > > >> minor > > > >>> > > > >> > > > > details. > > > >>> > > > >> > > > > > We could add concrete names to the FLIP :) > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > These metrics and configs are public API and need > to > > > be > > > >>> > stable > > > >>> > > > >> across > > > >>> > > > >> > > > minor > > > >>> > > > >> > > > > versions, could we document them before finalizing > > the > > > >>> FLIP? > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > Cheers, > > > >>> > > > >> > > > > > Gyula > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin < > > > >>> > > lindon...@gmail.com> > > > >>> > > > >> > wrote: > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > >> Hi Max, > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> Thank you for the proposal. The proposal > tackles > > a > > > >>> very > > > >>> > > > >> important > > > >>> > > > >> > > > issue > > > >>> > > > >> > > > > >> for Flink users and the design looks promising > > > >>> overall! > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> I have some questions to better understand the > > > >>> proposed > > > >>> > > > public > > > >>> > > > >> > > > > interfaces > > > >>> > > > >> > > > > >> and the algorithm. > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 1) The proposal seems to assume that the > > operator's > > > >>> > > > >> > > > busyTimeMsPerSecond > > > >>> > > > >> > > > > >> could reach 1 sec. I believe this is mostly > true > > > for > > > >>> > > > cpu-bound > > > >>> > > > >> > > > > operators. > > > >>> > > > >> > > > > >> Could you confirm that this can also be true > for > > > >>> io-bound > > > >>> > > > >> > operators > > > >>> > > > >> > > > > such as > > > >>> > > > >> > > > > >> sinks? For example, suppose a Kafka Sink > subtask > > > has > > > >>> > > reached > > > >>> > > > >> I/O > > > >>> > > > >> > > > > bottleneck > > > >>> > > > >> > > > > >> when flushing data out to the Kafka clusters, > > will > > > >>> > > > >> > > busyTimeMsPerSecond > > > >>> > > > >> > > > > >> reach 1 sec? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 2) It is said that "users can configure a > maximum > > > >>> time to > > > >>> > > > fully > > > >>> > > > >> > > > process > > > >>> > > > >> > > > > >> the backlog". The configuration section does > not > > > seem > > > >>> to > > > >>> > > > >> provide > > > >>> > > > >> > > this > > > >>> > > > >> > > > > >> config. Could you specify this? And any chance > > this > > > >>> > > proposal > > > >>> > > > >> can > > > >>> > > > >> > > > provide > > > >>> > > > >> > > > > >> the formula for calculating the new processing > > > rate? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 3) How are users expected to specify the > > > per-operator > > > >>> > > configs > > > >>> > > > >> > (e.g. > > > >>> > > > >> > > > > >> target utilization)? For example, should users > > > >>> specify it > > > >>> > > > >> > > > > programmatically > > > >>> > > > >> > > > > >> in a DataStream/Table/SQL API? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 4) How often will the Flink Kubernetes operator > > > query > > > >>> > > metrics > > > >>> > > > >> from > > > >>> > > > >> > > > > >> JobManager? Is this configurable? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 5) Could you specify the config name and > default > > > value > > > >>> > for > > > >>> > > > the > > > >>> > > > >> > > > proposed > > > >>> > > > >> > > > > >> configs? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> 6) Could you add the name/mbean/type for the > > > proposed > > > >>> > > > metrics? > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> Cheers, > > > >>> > > > >> > > > > >> Dong > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > >> > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >> > > > > > >