Thanks for the reply. Gyula and Max.
Prasanna On Sat, 26 Nov 2022, 00:24 Maximilian Michels, <m...@apache.org> wrote: > Hi John, hi Prasanna, hi Rui, > > Gyula already gave great answers to your questions, just adding to it: > > >What’s the reason to add auto scaling to the Operator instead of to the > JobManager? > > As Gyula mentioned, the JobManager is not the ideal place, at least not > until Flink supports in-place autoscaling which is a related but ultimately > very different problem because it involves solving job reconfiguration at > runtime. I believe the AdaptiveScheduler has moved into this direction and > there is nothing preventing us from using it in the future once it has > evolved further. For now, going through the job redeployment route seems > like the easiest and safest way. > > >Could we finally use the autoscaler as a outside tool? or run it as a > separate java process? > > I think we could but I wouldn't make it a requirement for the first > version. There is nothing preventing the autoscaler from running as a > separate k8s/yarn deployment which would provide some of the same > availability guarantees as the operator or any deployment has on k8s/yarn. > However, I think this increases complexity by a fair bit because the > operator already has all the configuration and tooling to manage Flink > jobs. I'm not at all opposed to coming up with a way to allow the > autoscaler to run separately as well as with the k8s operator. I just think > it is out of scope for the first version to keep the complexity and scope > under control. > > -Max > > > On Fri, Nov 25, 2022 at 8:16 AM Rui Fan <1996fan...@gmail.com> wrote: > > > Hi Gyula > > > > Thanks for the clarification! > > > > Best > > Rui Fan > > > > On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > Rui, Prasanna: > > > > > > I am afraid that creating a completely independent autoscaler process > > that > > > works with any type of Flink clusters is out of scope right now due to > > the > > > following reasons: > > > > > > If we were to create a new general process, we would have to implement > > high > > > availability and a pluggable mechanism to durably store metadata etc. > The > > > process itself would also have to run somewhere so we would have to > > provide > > > integrations. > > > > > > It would also not be able to scale clusters easily without adding > > > Kubernetes-operator-like functionality to it, and if the user has to do > > it > > > manually most of the value is already lost. > > > > > > Last but not least this would have the potential of interfering with > > other > > > actions the user might be currently doing, making the autoscaler itself > > > complex and more unreliable. > > > > > > These are all prohibitive reasons at this point. We already have a > > > prototype that tackle these smoothly as part of the Kubernetes > operator. > > > > > > Instead of trying to put the autoscaler somewhere else we might also > > > consider supporting different cluster types within the Kubernetes > > operator. > > > While that might sound silly at first, it is of similar scope to your > > > suggestions and could help the problem. > > > > > > As for the new config question, we could collectively decide to > backport > > > this feature to enable the autoscaler as it is a very minor change. > > > > > > Gyula > > > > > > On Fri, 25 Nov 2022 at 06:21, John Roesler <vvcep...@apache.org> > wrote: > > > > > > > Thanks for this answer, Gyula! > > > > -John > > > > > > > > On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote: > > > > > Hi John! > > > > > > > > > > Thank you for the excellent question. > > > > > > > > > > There are few reasons why we felt that the operator is the right > > place > > > > for > > > > > this component: > > > > > > > > > > - Ideally the autoscaler is a separate process (an outside > > observer) , > > > > and > > > > > the jobmanager is very much tied to the lifecycle of the job. The > > > > operator > > > > > is a perfect example of such an external process that lives beyond > > > > > individual jobs. > > > > > - Scaling itself might need some external resource management (for > > > > > standalone clusters) that the jobmanager is not capable of, and the > > > logic > > > > > is already in the operator > > > > > - Adding this to the operator allows us to integrate this fully in > > the > > > > > lifecycle management of the application. This guarantees that > scaling > > > > > decisions do not interfere with upgrades, suspends etc. > > > > > - By adding it to the operator, the autoscaler can potentially work > > on > > > > > older Flink versions as well > > > > > - The jobmanager is a component designed to handle Flink individual > > > jobs, > > > > > but the autoscaler component needs to work on a higher abstraction > > > layer > > > > to > > > > > be able to integrate with user job upgrades etc. > > > > > > > > > > These are some of the main things that come to my mind :) > > > > > > > > > > Having it in the operator ties this logic to Kubernetes itself but > we > > > > feel > > > > > that an autoscaler is mostly relevant in an elastic cloud > environment > > > > > anyways. > > > > > > > > > > Cheers, > > > > > Gyula > > > > > > > > > > On Thu, Nov 24, 2022 at 9:40 PM John Roesler <vvcep...@apache.org> > > > > wrote: > > > > > > > > > >> Hi Max, > > > > >> > > > > >> Thanks for the FLIP! > > > > >> > > > > >> I’ve been curious about one one point. I can imagine some good > > reasons > > > > for > > > > >> it but wonder what you have in mind. What’s the reason to add auto > > > > scaling > > > > >> to the Operator instead of to the JobManager? > > > > >> > > > > >> It seems like adding that capability to the JobManager would be a > > > bigger > > > > >> project, but it also would create some interesting opportunities. > > > > >> > > > > >> This is certainly not a suggestion, just a question. > > > > >> > > > > >> Thanks! > > > > >> John > > > > >> > > > > >> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote: > > > > >> > Thanks for your comments @Dong and @Chen. It is true that not > all > > > the > > > > >> > details are contained in the FLIP. The document is meant as a > > > general > > > > >> > design concept. > > > > >> > > > > > >> > As for the rescaling time, this is going to be a configurable > > > setting > > > > for > > > > >> > now but it is foreseeable that we will provide auto-tuning of > this > > > > >> > configuration value by observing the job restart time. Same goes > > for > > > > the > > > > >> > scaling decision itself which can learn from previous decisions. > > But > > > > we > > > > >> > want to keep it simple for the first version. > > > > >> > > > > > >> > For sources that do not support the pendingRecords metric, we > are > > > > >> planning > > > > >> > to either give the user the choice to set a manual target rate, > or > > > > scale > > > > >> it > > > > >> > purely based on its utilization as reported via > > busyTimeMsPerSecond. > > > > In > > > > >> > case of legacy sources, we will skip scaling these branches > > entirely > > > > >> > because they support neither of these metrics. > > > > >> > > > > > >> > -Max > > > > >> > > > > > >> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels < > > m...@apache.org > > > > > > > > >> wrote: > > > > >> > > > > > >> >> >Do we think the scaler could be a plugin or hard coded ? > > > > >> >> > > > > >> >> +1 For pluggable scaling logic. > > > > >> >> > > > > >> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin <qinnc...@gmail.com> > > > wrote: > > > > >> >> > > > > >> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra < > > gyula.f...@gmail.com> > > > > >> wrote: > > > > >> >>> > > > > >> >>> > Hi Chen! > > > > >> >>> > > > > > >> >>> > I think in the long term it makes sense to provide some > > > pluggable > > > > >> >>> > mechanisms but it's not completely trivial where exactly you > > > would > > > > >> plug > > > > >> >>> in > > > > >> >>> > your custom logic at this point. > > > > >> >>> > > > > > >> >>> sounds good, more specifically would be great if it can accept > > > input > > > > >> >>> features > > > > >> >>> (including previous scaling decisions) and output decisions. > > > > >> >>> Folks might keep their own secret sauce and avoid patching oss > > > fork. > > > > >> >>> > > > > >> >>> > > > > > >> >>> > In any case the problems you mentioned should be solved > > robustly > > > > by > > > > >> the > > > > >> >>> > algorithm itself without any customization: > > > > >> >>> > - We need to be able to detect ineffective scaling > decisions, > > > > let\s > > > > >> >>> say we > > > > >> >>> > scaled up (expecting better throughput with a higher > > > parallelism) > > > > >> but we > > > > >> >>> > did not get a better processing capacity (this would be the > > > > external > > > > >> >>> > service bottleneck) > > > > >> >>> > > > > > >> >>> sounds good, so we would at least try restart job once > > (optimistic > > > > >> path) > > > > >> >>> as > > > > >> >>> design choice. > > > > >> >>> > > > > >> >>> > - We are evaluating metrics in windows, and we have some > > > flexible > > > > >> >>> > boundaries to avoid scaling on minor load spikes > > > > >> >>> > > > > > >> >>> yes, would be great if user can feed in throughput changes > over > > > > >> different > > > > >> >>> time buckets (last 10s, 30s, 1 min,5 mins) as input features > > > > >> >>> > > > > >> >>> > > > > > >> >>> > Regards, > > > > >> >>> > Gyula > > > > >> >>> > > > > > >> >>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin < > qinnc...@gmail.com > > > > > > > >> wrote: > > > > >> >>> > > > > > >> >>> > > Hi Gyula, > > > > >> >>> > > > > > > >> >>> > > Do we think the scaler could be a plugin or hard coded ? > > > > >> >>> > > We observed some cases scaler can't address (e.g async io > > > > >> dependency > > > > >> >>> > > service degradation or small spike that doesn't worth > > > restarting > > > > >> job) > > > > >> >>> > > > > > > >> >>> > > Thanks, > > > > >> >>> > > Chen > > > > >> >>> > > > > > > >> >>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra < > > > > gyula.f...@gmail.com> > > > > >> >>> wrote: > > > > >> >>> > > > > > > >> >>> > > > Hi Dong! > > > > >> >>> > > > > > > > >> >>> > > > Could you please confirm that your main concerns have > been > > > > >> >>> addressed? > > > > >> >>> > > > > > > > >> >>> > > > Some other minor details that might not have been fully > > > > >> clarified: > > > > >> >>> > > > - The prototype has been validated on some production > > > > workloads > > > > >> yes > > > > >> >>> > > > - We are only planning to use metrics that are > generally > > > > >> available > > > > >> >>> and > > > > >> >>> > > are > > > > >> >>> > > > previously accepted to be standardized connector metrics > > > (not > > > > >> Kafka > > > > >> >>> > > > specific). This is actually specified in the FLIP > > > > >> >>> > > > - Even if some metrics (such as pendingRecords) are not > > > > >> accessible > > > > >> >>> the > > > > >> >>> > > > scaling algorithm works and can be used. For source > > scaling > > > > >> based on > > > > >> >>> > > > utilization alone we still need some trivial > modifications > > > on > > > > the > > > > >> >>> > > > implementation side. > > > > >> >>> > > > > > > > >> >>> > > > Cheers, > > > > >> >>> > > > Gyula > > > > >> >>> > > > > > > > >> >>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra < > > > > gyula.f...@gmail.com > > > > >> > > > > > >> >>> > wrote: > > > > >> >>> > > > > > > > >> >>> > > > > Hi Dong! > > > > >> >>> > > > > > > > > >> >>> > > > > This is not an experimental feature proposal. The > > > > >> implementation > > > > >> >>> of > > > > >> >>> > the > > > > >> >>> > > > > prototype is still in an experimental phase but by the > > > time > > > > the > > > > >> >>> FLIP, > > > > >> >>> > > > > initial prototype and review is done, this should be > in > > a > > > > good > > > > >> >>> stable > > > > >> >>> > > > first > > > > >> >>> > > > > version. > > > > >> >>> > > > > This proposal is pretty general as autoscalers/tuners > > get > > > as > > > > >> far > > > > >> >>> as I > > > > >> >>> > > > > understand and there is no history of any alternative > > > effort > > > > >> that > > > > >> >>> > even > > > > >> >>> > > > > comes close to the applicability of this solution. > > > > >> >>> > > > > > > > > >> >>> > > > > Any large features that were added to Flink in the > past > > > have > > > > >> gone > > > > >> >>> > > through > > > > >> >>> > > > > several iterations over the years and the APIs have > > > evolved > > > > as > > > > >> >>> they > > > > >> >>> > > > matured. > > > > >> >>> > > > > Something like the autoscaler can only be successful > if > > > > there > > > > >> is > > > > >> >>> > enough > > > > >> >>> > > > > user exposure and feedback to make it good, putting it > > in > > > an > > > > >> >>> external > > > > >> >>> > > > repo > > > > >> >>> > > > > will not get us anywhere. > > > > >> >>> > > > > > > > > >> >>> > > > > We have a prototype implementation ready that works > well > > > and > > > > >> it is > > > > >> >>> > more > > > > >> >>> > > > or > > > > >> >>> > > > > less feature complete. We proposed this FLIP based on > > > > something > > > > >> >>> that > > > > >> >>> > we > > > > >> >>> > > > see > > > > >> >>> > > > > as a working solution, please do not underestimate the > > > > effort > > > > >> that > > > > >> >>> > went > > > > >> >>> > > > > into this proposal and the validation of the ideas. So > > in > > > > this > > > > >> >>> sense > > > > >> >>> > > our > > > > >> >>> > > > > approach here is the same as with the Table Store and > > > > >> Kubernetes > > > > >> >>> > > Operator > > > > >> >>> > > > > and other big components of the past. On the other > hand > > > it's > > > > >> >>> > impossible > > > > >> >>> > > > to > > > > >> >>> > > > > sufficiently explain all the technical > > > depth/implementation > > > > >> >>> details > > > > >> >>> > of > > > > >> >>> > > > such > > > > >> >>> > > > > complex components in FLIPs to 100%, I feel we have a > > good > > > > >> >>> overview > > > > >> >>> > of > > > > >> >>> > > > the > > > > >> >>> > > > > algorithm in the FLIP and the implementation should > > cover > > > > all > > > > >> >>> > remaining > > > > >> >>> > > > > questions. We will have an extended code review phase > > > > following > > > > >> >>> the > > > > >> >>> > > FLIP > > > > >> >>> > > > > vote before this make it into the project. > > > > >> >>> > > > > > > > > >> >>> > > > > I understand your concern regarding the stability of > > Flink > > > > >> >>> Kubernetes > > > > >> >>> > > > > Operator config and metric names. We have decided to > not > > > > >> provide > > > > >> >>> > > > guarantees > > > > >> >>> > > > > there yet but if you feel that it's time for the > > operator > > > to > > > > >> >>> support > > > > >> >>> > > such > > > > >> >>> > > > > guarantees please open a separate discussion on that > > > topic, > > > > I > > > > >> >>> don't > > > > >> >>> > > want > > > > >> >>> > > > to > > > > >> >>> > > > > mix the two problems here. > > > > >> >>> > > > > > > > > >> >>> > > > > Regards, > > > > >> >>> > > > > Gyula > > > > >> >>> > > > > > > > > >> >>> > > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin < > > > > lindon...@gmail.com> > > > > >> >>> > wrote: > > > > >> >>> > > > > > > > > >> >>> > > > >> Hi Gyula, > > > > >> >>> > > > >> > > > > >> >>> > > > >> If I understand correctly, this autopilot proposal is > > an > > > > >> >>> > experimental > > > > >> >>> > > > >> feature and its configs/metrics are not mature enough > > to > > > > >> provide > > > > >> >>> > > > backward > > > > >> >>> > > > >> compatibility yet. And the proposal provides > high-level > > > > ideas > > > > >> of > > > > >> >>> the > > > > >> >>> > > > >> algorithm but it is probably too complicated to > explain > > > it > > > > >> >>> > end-to-end. > > > > >> >>> > > > >> > > > > >> >>> > > > >> On the one hand, I do agree that having an > auto-tuning > > > > >> prototype, > > > > >> >>> > even > > > > >> >>> > > > if > > > > >> >>> > > > >> not mature, is better than nothing for Flink users. > On > > > the > > > > >> other > > > > >> >>> > > hand, I > > > > >> >>> > > > >> am > > > > >> >>> > > > >> concerned that this FLIP seems a bit too > experimental, > > > and > > > > >> >>> starting > > > > >> >>> > > with > > > > >> >>> > > > >> an > > > > >> >>> > > > >> immature design might make it harder for us to reach > a > > > > >> >>> > > production-ready > > > > >> >>> > > > >> and > > > > >> >>> > > > >> generally applicable auto-tuner in the future. And > > > > introducing > > > > >> >>> too > > > > >> >>> > > > >> backward > > > > >> >>> > > > >> incompatible changes generally hurts users' trust in > > the > > > > Flink > > > > >> >>> > > project. > > > > >> >>> > > > >> > > > > >> >>> > > > >> One alternative might be to develop and experiment > with > > > > this > > > > >> >>> feature > > > > >> >>> > > in > > > > >> >>> > > > a > > > > >> >>> > > > >> non-Flink repo. You can iterate fast without worrying > > > about > > > > >> >>> > typically > > > > >> >>> > > > >> backward compatibility requirement as required for > most > > > > Flink > > > > >> >>> public > > > > >> >>> > > > >> features. And once the feature is reasonably > evaluated > > > and > > > > >> mature > > > > >> >>> > > > enough, > > > > >> >>> > > > >> it will be much easier to explain the design and > > address > > > > all > > > > >> the > > > > >> >>> > > issues > > > > >> >>> > > > >> mentioned above. For example, Jingsong implemented a > > > Flink > > > > >> Table > > > > >> >>> > Store > > > > >> >>> > > > >> prototype > > > > >> >>> > > > >> < > > > > >> >>> > > > https://github.com/JingsongLi/flink/tree/table_storage/flink-table > > > > >> >>> > > > > > > >> >>> > > > >> before > > > > >> >>> > > > >> proposing FLIP-188 in this thread > > > > >> >>> > > > >> < > > > > >> >>> > > https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064 > > > >. > > > > >> >>> > > > >> > > > > >> >>> > > > >> I don't intend to block your progress. Just my two > > cents. > > > > It > > > > >> >>> will be > > > > >> >>> > > > great > > > > >> >>> > > > >> to hear more from other developers (e.g. in the > voting > > > > >> thread). > > > > >> >>> > > > >> > > > > >> >>> > > > >> Thanks, > > > > >> >>> > > > >> Dong > > > > >> >>> > > > >> > > > > >> >>> > > > >> > > > > >> >>> > > > >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra < > > > > >> gyula.f...@gmail.com > > > > >> >>> > > > > > >> >>> > > > wrote: > > > > >> >>> > > > >> > > > > >> >>> > > > >> > Hi Dong, > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > Let me address your comments. > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > Time for scale / backlog processing time > derivation: > > > > >> >>> > > > >> > We can add some more details to the Flip but at > this > > > > point > > > > >> the > > > > >> >>> > > > >> > implementation is actually much simpler than the > > > > algorithm > > > > >> to > > > > >> >>> > > describe > > > > >> >>> > > > >> it. > > > > >> >>> > > > >> > I would not like to add more equations etc because > it > > > > just > > > > >> >>> > > > >> overcomplicates > > > > >> >>> > > > >> > something relatively simple in practice. > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > In a nutshell: Time to recover == lag / > > > > >> >>> > > > processing-rate-after-scaleup. > > > > >> >>> > > > >> > It's fairly easy to see where this is going, but > best > > > to > > > > >> see in > > > > >> >>> > > code. > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > Using pendingRecords and alternative mechanisms: > > > > >> >>> > > > >> > True that the current algorithm relies on pending > > > > records to > > > > >> >>> > > > effectively > > > > >> >>> > > > >> > compute the target source processing rates and > > > therefore > > > > >> scale > > > > >> >>> > > > sources. > > > > >> >>> > > > >> > This is available for Kafka which is by far the > most > > > > common > > > > >> >>> > > streaming > > > > >> >>> > > > >> > source and is used by the majority of streaming > > > > applications > > > > >> >>> > > > currently. > > > > >> >>> > > > >> > It would be very easy to add alternative purely > > > > utilization > > > > >> >>> based > > > > >> >>> > > > >> scaling > > > > >> >>> > > > >> > to the sources. We can start with the current > > proposal > > > > and > > > > >> add > > > > >> >>> > this > > > > >> >>> > > > >> along > > > > >> >>> > > > >> > the way before the first version. > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > Metrics, Configs and Public API: > > > > >> >>> > > > >> > The autoscaler feature is proposed for the Flink > > > > Kubernetes > > > > >> >>> > Operator > > > > >> >>> > > > >> which > > > > >> >>> > > > >> > does not have the same API/config maturity and thus > > > does > > > > not > > > > >> >>> > provide > > > > >> >>> > > > the > > > > >> >>> > > > >> > same guarantees. > > > > >> >>> > > > >> > We currently support backward compatibilty for the > > CRD > > > > >> itself > > > > >> >>> and > > > > >> >>> > > not > > > > >> >>> > > > >> the > > > > >> >>> > > > >> > configs or metrics. This does not mean that we do > not > > > > aim to > > > > >> >>> do so > > > > >> >>> > > but > > > > >> >>> > > > >> at > > > > >> >>> > > > >> > this stage we still have to clean up the details of > > the > > > > >> newly > > > > >> >>> > added > > > > >> >>> > > > >> > components. In practice this means that if we > manage > > to > > > > get > > > > >> the > > > > >> >>> > > > metrics > > > > >> >>> > > > >> / > > > > >> >>> > > > >> > configs right at the first try we will keep them > and > > > > provide > > > > >> >>> > > > >> compatibility, > > > > >> >>> > > > >> > but if we feel that we missed something or we don't > > > need > > > > >> >>> something > > > > >> >>> > > we > > > > >> >>> > > > >> can > > > > >> >>> > > > >> > still remove it. It's a more pragmatic approach for > > > such > > > > a > > > > >> >>> > component > > > > >> >>> > > > >> that > > > > >> >>> > > > >> > is likely to evolve than setting everything in > stone > > > > >> >>> immediately. > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > Cheers, > > > > >> >>> > > > >> > Gyula > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin < > > > > >> lindon...@gmail.com> > > > > >> >>> > > wrote: > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > Thanks for the update! Please see comments > inline. > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian > > Michels < > > > > >> >>> > > m...@apache.org > > > > >> >>> > > > > > > > > >> >>> > > > >> > > wrote: > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > Of course! Let me know if your concerns are > > > > addressed. > > > > >> The > > > > >> >>> > wiki > > > > >> >>> > > > page > > > > >> >>> > > > >> > has > > > > >> >>> > > > >> > > > been updated. > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > >It will be great to add this in the FLIP so > that > > > > >> reviewers > > > > >> >>> > can > > > > >> >>> > > > >> > > understand > > > > >> >>> > > > >> > > > how the source parallelisms are computed and > how > > > the > > > > >> >>> algorithm > > > > >> >>> > > > works > > > > >> >>> > > > >> > > > end-to-end. > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > I've updated the FLIP page to add more details > on > > > how > > > > >> the > > > > >> >>> > > > >> backlog-based > > > > >> >>> > > > >> > > > scaling works (2). > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > The algorithm is much more informative now. The > > > > algorithm > > > > >> >>> > > currently > > > > >> >>> > > > >> uses > > > > >> >>> > > > >> > > "Estimated time for rescale" to derive new source > > > > >> >>> parallelism. > > > > >> >>> > > Could > > > > >> >>> > > > >> we > > > > >> >>> > > > >> > > also specify in the FLIP how this value is > derived? > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > The algorithm currently uses pendingRecords to > > derive > > > > >> source > > > > >> >>> > > > >> parallelism. > > > > >> >>> > > > >> > > It is an optional metric and KafkaSource > currently > > > > reports > > > > >> >>> this > > > > >> >>> > > > >> metric. > > > > >> >>> > > > >> > So > > > > >> >>> > > > >> > > it means that only the proposed algorithm > currently > > > > only > > > > >> >>> works > > > > >> >>> > > when > > > > >> >>> > > > >> all > > > > >> >>> > > > >> > > sources of the job are KafkaSource, right? > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > This issue considerably limits the applicability > of > > > > this > > > > >> >>> FLIP. > > > > >> >>> > Do > > > > >> >>> > > > you > > > > >> >>> > > > >> > think > > > > >> >>> > > > >> > > most (if not all) streaming source will report > this > > > > >> metric? > > > > >> >>> > > > >> > Alternatively, > > > > >> >>> > > > >> > > any chance we can have a fallback solution to > > > evaluate > > > > the > > > > >> >>> > source > > > > >> >>> > > > >> > > parallelism based on e.g. cpu or idle ratio for > > cases > > > > >> where > > > > >> >>> this > > > > >> >>> > > > >> metric > > > > >> >>> > > > >> > is > > > > >> >>> > > > >> > > not available? > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > >These metrics and configs are public API and > > need > > > > to be > > > > >> >>> > stable > > > > >> >>> > > > >> across > > > > >> >>> > > > >> > > > minor versions, could we document them before > > > > finalizing > > > > >> >>> the > > > > >> >>> > > FLIP? > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > Metrics and config changes are not strictly > part > > of > > > > the > > > > >> >>> public > > > > >> >>> > > API > > > > >> >>> > > > >> but > > > > >> >>> > > > >> > > > Gyula has added a section. > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > Hmm... if metrics are not public API, then it > might > > > > happen > > > > >> >>> that > > > > >> >>> > we > > > > >> >>> > > > >> change > > > > >> >>> > > > >> > > the mbean path in a minor release and break > users' > > > > >> monitoring > > > > >> >>> > > tool. > > > > >> >>> > > > >> > > Similarly, we might change configs in a minor > > release > > > > that > > > > >> >>> break > > > > >> >>> > > > >> user's > > > > >> >>> > > > >> > job > > > > >> >>> > > > >> > > behavior. We probably want to avoid these > breaking > > > > >> changes in > > > > >> >>> > > minor > > > > >> >>> > > > >> > > releases. > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > It is documented here > > > > >> >>> > > > >> > > < > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > > > >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > that > > > > >> >>> > > > >> > > "Exposed monitoring information" and > "Configuration > > > > >> settings" > > > > >> >>> > are > > > > >> >>> > > > >> public > > > > >> >>> > > > >> > > interfaces of the project. > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > Maybe we should also specify the metric here so > > that > > > > users > > > > >> >>> can > > > > >> >>> > > > safely > > > > >> >>> > > > >> > setup > > > > >> >>> > > > >> > > dashboards and tools to track how the autopilot > is > > > > >> working, > > > > >> >>> > > similar > > > > >> >>> > > > to > > > > >> >>> > > > >> > how > > > > >> >>> > > > >> > > metrics are documented in FLIP-33 > > > > >> >>> > > > >> > > < > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > > > >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > ? > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > -Max > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin < > > > > >> >>> lindon...@gmail.com > > > > >> >>> > > > > > > >> >>> > > > >> wrote: > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > > Hi Maximilian, > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > It seems that the following comments from the > > > > previous > > > > >> >>> > > > discussions > > > > >> >>> > > > >> > have > > > > >> >>> > > > >> > > > not > > > > >> >>> > > > >> > > > > been addressed yet. Any chance we can have > them > > > > >> addressed > > > > >> >>> > > before > > > > >> >>> > > > >> > > starting > > > > >> >>> > > > >> > > > > the voting thread? > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > Thanks, > > > > >> >>> > > > >> > > > > Dong > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra < > > > > >> >>> > > gyula.f...@gmail.com > > > > >> >>> > > > > > > > > >> >>> > > > >> > > wrote: > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > Hi Dong! > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > Let me try to answer the questions :) > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > 1 : busyTimeMsPerSecond is not specific for > > > CPU, > > > > it > > > > >> >>> > measures > > > > >> >>> > > > the > > > > >> >>> > > > >> > time > > > > >> >>> > > > >> > > > > > spent in the main record processing loop > for > > an > > > > >> >>> operator > > > > >> >>> > if > > > > >> >>> > > I > > > > >> >>> > > > >> > > > > > understand correctly. This includes IO > > > operations > > > > >> too. > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > 2: We should add this to the FLIP I agree. > It > > > > would > > > > >> be > > > > >> >>> a > > > > >> >>> > > > >> Duration > > > > >> >>> > > > >> > > > config > > > > >> >>> > > > >> > > > > > with the expected catch up time after > > rescaling > > > > >> (let's > > > > >> >>> > say 5 > > > > >> >>> > > > >> > > minutes). > > > > >> >>> > > > >> > > > It > > > > >> >>> > > > >> > > > > > could be computed based on the current data > > > rate > > > > and > > > > >> >>> the > > > > >> >>> > > > >> calculated > > > > >> >>> > > > >> > > max > > > > >> >>> > > > >> > > > > > processing rate after the rescale. > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > It will be great to add this in the FLIP so > > that > > > > >> >>> reviewers > > > > >> >>> > can > > > > >> >>> > > > >> > > understand > > > > >> >>> > > > >> > > > > how the source parallelisms are computed and > > how > > > > the > > > > >> >>> > algorithm > > > > >> >>> > > > >> works > > > > >> >>> > > > >> > > > > end-to-end. > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > 3: In the current proposal we don't have > per > > > > >> operator > > > > >> >>> > > configs. > > > > >> >>> > > > >> > Target > > > > >> >>> > > > >> > > > > > utilization would apply to all operators > > > > uniformly. > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > 4: It should be configurable, yes. > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > Since this config is a public API, could we > > > update > > > > the > > > > >> >>> FLIP > > > > >> >>> > > > >> > accordingly > > > > >> >>> > > > >> > > > to > > > > >> >>> > > > >> > > > > provide this config? > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > 5,6: The names haven't been finalized but I > > > think > > > > >> these > > > > >> >>> > are > > > > >> >>> > > > >> minor > > > > >> >>> > > > >> > > > > details. > > > > >> >>> > > > >> > > > > > We could add concrete names to the FLIP :) > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > These metrics and configs are public API and > > need > > > > to > > > > >> be > > > > >> >>> > stable > > > > >> >>> > > > >> across > > > > >> >>> > > > >> > > > minor > > > > >> >>> > > > >> > > > > versions, could we document them before > > > finalizing > > > > the > > > > >> >>> FLIP? > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > Cheers, > > > > >> >>> > > > >> > > > > > Gyula > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin < > > > > >> >>> > > lindon...@gmail.com> > > > > >> >>> > > > >> > wrote: > > > > >> >>> > > > >> > > > > > > > > > >> >>> > > > >> > > > > >> Hi Max, > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> Thank you for the proposal. The proposal > > > > tackles a > > > > >> >>> very > > > > >> >>> > > > >> important > > > > >> >>> > > > >> > > > issue > > > > >> >>> > > > >> > > > > >> for Flink users and the design looks > > promising > > > > >> >>> overall! > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> I have some questions to better understand > > the > > > > >> >>> proposed > > > > >> >>> > > > public > > > > >> >>> > > > >> > > > > interfaces > > > > >> >>> > > > >> > > > > >> and the algorithm. > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 1) The proposal seems to assume that the > > > > operator's > > > > >> >>> > > > >> > > > busyTimeMsPerSecond > > > > >> >>> > > > >> > > > > >> could reach 1 sec. I believe this is > mostly > > > true > > > > >> for > > > > >> >>> > > > cpu-bound > > > > >> >>> > > > >> > > > > operators. > > > > >> >>> > > > >> > > > > >> Could you confirm that this can also be > true > > > for > > > > >> >>> io-bound > > > > >> >>> > > > >> > operators > > > > >> >>> > > > >> > > > > such as > > > > >> >>> > > > >> > > > > >> sinks? For example, suppose a Kafka Sink > > > subtask > > > > >> has > > > > >> >>> > > reached > > > > >> >>> > > > >> I/O > > > > >> >>> > > > >> > > > > bottleneck > > > > >> >>> > > > >> > > > > >> when flushing data out to the Kafka > > clusters, > > > > will > > > > >> >>> > > > >> > > busyTimeMsPerSecond > > > > >> >>> > > > >> > > > > >> reach 1 sec? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 2) It is said that "users can configure a > > > > maximum > > > > >> >>> time to > > > > >> >>> > > > fully > > > > >> >>> > > > >> > > > process > > > > >> >>> > > > >> > > > > >> the backlog". The configuration section > does > > > not > > > > >> seem > > > > >> >>> to > > > > >> >>> > > > >> provide > > > > >> >>> > > > >> > > this > > > > >> >>> > > > >> > > > > >> config. Could you specify this? And any > > chance > > > > this > > > > >> >>> > > proposal > > > > >> >>> > > > >> can > > > > >> >>> > > > >> > > > provide > > > > >> >>> > > > >> > > > > >> the formula for calculating the new > > processing > > > > >> rate? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 3) How are users expected to specify the > > > > >> per-operator > > > > >> >>> > > configs > > > > >> >>> > > > >> > (e.g. > > > > >> >>> > > > >> > > > > >> target utilization)? For example, should > > users > > > > >> >>> specify it > > > > >> >>> > > > >> > > > > programmatically > > > > >> >>> > > > >> > > > > >> in a DataStream/Table/SQL API? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 4) How often will the Flink Kubernetes > > > operator > > > > >> query > > > > >> >>> > > metrics > > > > >> >>> > > > >> from > > > > >> >>> > > > >> > > > > >> JobManager? Is this configurable? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 5) Could you specify the config name and > > > default > > > > >> value > > > > >> >>> > for > > > > >> >>> > > > the > > > > >> >>> > > > >> > > > proposed > > > > >> >>> > > > >> > > > > >> configs? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> 6) Could you add the name/mbean/type for > the > > > > >> proposed > > > > >> >>> > > > metrics? > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> Cheers, > > > > >> >>> > > > >> > > > > >> Dong > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > >> > > > > >> >>> > > > >> > > > > > > > > >> >>> > > > >> > > > > > > > >> >>> > > > >> > > > > > > >> >>> > > > >> > > > > > >> >>> > > > >> > > > > >> >>> > > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > >> >> > > > > >> > > > > > > > > > >