Hi Gyula,

Do we think the scaler could be a plugin or hard coded ?
We observed some cases scaler can't address (e.g async io dependency
service degradation or small spike that doesn't worth restarting job)

Thanks,
Chen

On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Dong!
>
> Could you please confirm that your main concerns have been addressed?
>
> Some other minor details that might not have been fully clarified:
>  - The prototype has been validated on some production workloads yes
>  - We are only planning to use metrics that are generally available and are
> previously accepted to be standardized connector metrics (not Kafka
> specific). This is actually specified in the FLIP
>  - Even if some metrics (such as pendingRecords) are not accessible the
> scaling algorithm works and can be used. For source scaling based on
> utilization alone we still need some trivial modifications on the
> implementation side.
>
> Cheers,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Hi Dong!
> >
> > This is not an experimental feature proposal. The implementation of the
> > prototype is still in an experimental phase but by the time the FLIP,
> > initial prototype and review is done, this should be in a good stable
> first
> > version.
> > This proposal is pretty general as autoscalers/tuners get as far as I
> > understand and there is no history of any alternative effort that even
> > comes close to the applicability of this solution.
> >
> > Any large features that were added to Flink in the past have gone through
> > several iterations over the years and the APIs have evolved as they
> matured.
> > Something like the autoscaler can only be successful if there is enough
> > user exposure and feedback to make it good, putting it in an external
> repo
> > will not get us anywhere.
> >
> > We have a prototype implementation ready that works well and it is more
> or
> > less feature complete. We proposed this FLIP based on something that we
> see
> > as a working solution, please do not underestimate the effort that went
> > into this proposal and the validation of the ideas. So in this sense our
> > approach here is the same as with the Table Store and Kubernetes Operator
> > and other big components of the past. On the other hand it's impossible
> to
> > sufficiently explain all the technical depth/implementation details of
> such
> > complex components in FLIPs to 100%, I feel we have a good overview of
> the
> > algorithm in the FLIP and the implementation should cover all remaining
> > questions. We will have an extended code review phase following the FLIP
> > vote before this make it into the project.
> >
> > I understand your concern regarding the stability of Flink Kubernetes
> > Operator config and metric names. We have decided to not provide
> guarantees
> > there yet but if you feel that it's time for the operator to support such
> > guarantees please open a separate discussion on that topic, I don't want
> to
> > mix the two problems here.
> >
> > Regards,
> > Gyula
> >
> > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> >> Hi Gyula,
> >>
> >> If I understand correctly, this autopilot proposal is an experimental
> >> feature and its configs/metrics are not mature enough to provide
> backward
> >> compatibility yet. And the proposal provides high-level ideas of the
> >> algorithm but it is probably too complicated to explain it end-to-end.
> >>
> >> On the one hand, I do agree that having an auto-tuning prototype, even
> if
> >> not mature, is better than nothing for Flink users. On the other hand, I
> >> am
> >> concerned that this FLIP seems a bit too experimental, and starting with
> >> an
> >> immature design might make it harder for us to reach a production-ready
> >> and
> >> generally applicable auto-tuner in the future. And introducing too
> >> backward
> >> incompatible changes generally hurts users' trust in the Flink project.
> >>
> >> One alternative might be to develop and experiment with this feature in
> a
> >> non-Flink repo. You can iterate fast without worrying about typically
> >> backward compatibility requirement as required for most Flink public
> >> features. And once the feature is reasonably evaluated and mature
> enough,
> >> it will be much easier to explain the design and address all the issues
> >> mentioned above. For example, Jingsong implemented a Flink Table Store
> >> prototype
> >> <https://github.com/JingsongLi/flink/tree/table_storage/flink-table>
> >> before
> >> proposing FLIP-188 in this thread
> >> <https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064>.
> >>
> >> I don't intend to block your progress. Just my two cents. It will be
> great
> >> to hear more from other developers (e.g. in the voting thread).
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >>
> >> > Hi Dong,
> >> >
> >> > Let me address your comments.
> >> >
> >> > Time for scale / backlog processing time derivation:
> >> > We can add some more details to the Flip but at this point the
> >> > implementation is actually much simpler than the algorithm to describe
> >> it.
> >> > I would not like to add more equations etc because it just
> >> overcomplicates
> >> > something relatively simple in practice.
> >> >
> >> > In a nutshell: Time to recover  == lag /
> processing-rate-after-scaleup.
> >> > It's fairly easy to see where this is going, but best to see in code.
> >> >
> >> > Using pendingRecords and alternative mechanisms:
> >> > True that the current algorithm relies on pending records to
> effectively
> >> > compute the target source processing rates and therefore scale
> sources.
> >> > This is available for Kafka which is by far the most common streaming
> >> > source and is used by the majority of streaming applications
> currently.
> >> > It would be very easy to add alternative purely utilization based
> >> scaling
> >> > to the sources. We can start with the current proposal and add this
> >> along
> >> > the way before the first version.
> >> >
> >> > Metrics, Configs and Public API:
> >> > The autoscaler feature is proposed for the Flink Kubernetes Operator
> >> which
> >> > does not have the same API/config maturity and thus does not provide
> the
> >> > same guarantees.
> >> > We currently support backward compatibilty for the CRD itself and not
> >> the
> >> > configs or metrics. This does not mean that we do not aim to do so but
> >> at
> >> > this stage we still have to clean up the details of the newly added
> >> > components. In practice this means that if we manage to get the
> metrics
> >> /
> >> > configs right at the first try we will keep them and provide
> >> compatibility,
> >> > but if we feel that we missed something or we don't need something we
> >> can
> >> > still remove it. It's a more pragmatic approach for such a component
> >> that
> >> > is likely to evolve than setting everything in stone immediately.
> >> >
> >> > Cheers,
> >> > Gyula
> >> >
> >> >
> >> >
> >> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin <lindon...@gmail.com> wrote:
> >> >
> >> > > Thanks for the update! Please see comments inline.
> >> > >
> >> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels <m...@apache.org
> >
> >> > > wrote:
> >> > >
> >> > > > Of course! Let me know if your concerns are addressed. The wiki
> page
> >> > has
> >> > > > been updated.
> >> > > >
> >> > > > >It will be great to add this in the FLIP so that reviewers can
> >> > > understand
> >> > > > how the source parallelisms are computed and how the algorithm
> works
> >> > > > end-to-end.
> >> > > >
> >> > > > I've updated the FLIP page to add more details on how the
> >> backlog-based
> >> > > > scaling works (2).
> >> > > >
> >> > >
> >> > > The algorithm is much more informative now.  The algorithm currently
> >> uses
> >> > > "Estimated time for rescale" to derive new source parallelism. Could
> >> we
> >> > > also specify in the FLIP how this value is derived?
> >> > >
> >> > > The algorithm currently uses pendingRecords to derive source
> >> parallelism.
> >> > > It is an optional metric and KafkaSource currently reports this
> >> metric.
> >> > So
> >> > > it means that only the proposed algorithm currently only works when
> >> all
> >> > > sources of the job are KafkaSource, right?
> >> > >
> >> > > This issue considerably limits the applicability of this FLIP. Do
> you
> >> > think
> >> > > most (if not all) streaming source will report this metric?
> >> > Alternatively,
> >> > > any chance we can have a fallback solution to evaluate the source
> >> > > parallelism based on e.g. cpu or idle ratio for cases where this
> >> metric
> >> > is
> >> > > not available?
> >> > >
> >> > >
> >> > > > >These metrics and configs are public API and need to be stable
> >> across
> >> > > > minor versions, could we document them before finalizing the FLIP?
> >> > > >
> >> > > > Metrics and config changes are not strictly part of the public API
> >> but
> >> > > > Gyula has added a section.
> >> > > >
> >> > >
> >> > > Hmm... if metrics are not public API, then it might happen that we
> >> change
> >> > > the mbean path in a minor release and break users' monitoring tool.
> >> > > Similarly, we might change configs in a minor release that break
> >> user's
> >> > job
> >> > > behavior. We probably want to avoid these breaking changes in minor
> >> > > releases.
> >> > >
> >> > > It is documented here
> >> > > <
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >> > > >
> >> > > that
> >> > > "Exposed monitoring information" and "Configuration settings" are
> >> public
> >> > > interfaces of the project.
> >> > >
> >> > > Maybe we should also specify the metric here so that users can
> safely
> >> > setup
> >> > > dashboards and tools to track how the autopilot is working, similar
> to
> >> > how
> >> > > metrics are documented in FLIP-33
> >> > > <
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> > > >
> >> > > ?
> >> > >
> >> > >
> >> > > > -Max
> >> > > >
> >> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > > >
> >> > > > > Hi Maximilian,
> >> > > > >
> >> > > > > It seems that the following comments from the previous
> discussions
> >> > have
> >> > > > not
> >> > > > > been addressed yet. Any chance we can have them addressed before
> >> > > starting
> >> > > > > the voting thread?
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Dong
> >> > > > >
> >> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com
> >
> >> > > wrote:
> >> > > > >
> >> > > > > > Hi Dong!
> >> > > > > >
> >> > > > > > Let me try to answer the questions :)
> >> > > > > >
> >> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures
> the
> >> > time
> >> > > > > > spent in the main record processing loop for an operator if I
> >> > > > > > understand correctly. This includes IO operations too.
> >> > > > > >
> >> > > > > > 2: We should add this to the FLIP I agree. It would be a
> >> Duration
> >> > > > config
> >> > > > > > with the expected catch up time after rescaling (let's say 5
> >> > > minutes).
> >> > > > It
> >> > > > > > could be computed based on the current data rate and the
> >> calculated
> >> > > max
> >> > > > > > processing rate after the rescale.
> >> > > > > >
> >> > > > >
> >> > > > > It will be great to add this in the FLIP so that reviewers can
> >> > > understand
> >> > > > > how the source parallelisms are computed and how the algorithm
> >> works
> >> > > > > end-to-end.
> >> > > > >
> >> > > > >
> >> > > > > > 3: In the current proposal we don't have per operator configs.
> >> > Target
> >> > > > > > utilization would apply to all operators uniformly.
> >> > > > > >
> >> > > > > > 4: It should be configurable, yes.
> >> > > > > >
> >> > > > >
> >> > > > > Since this config is a public API, could we update the FLIP
> >> > accordingly
> >> > > > to
> >> > > > > provide this config?
> >> > > > >
> >> > > > >
> >> > > > > >
> >> > > > > > 5,6: The names haven't been finalized but I think these are
> >> minor
> >> > > > > details.
> >> > > > > > We could add concrete names to the FLIP :)
> >> > > > > >
> >> > > > >
> >> > > > > These metrics and configs are public API and need to be stable
> >> across
> >> > > > minor
> >> > > > > versions, could we document them before finalizing the FLIP?
> >> > > > >
> >> > > > >
> >> > > > > >
> >> > > > > > Cheers,
> >> > > > > > Gyula
> >> > > > > >
> >> > > > > >
> >> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
> >> > wrote:
> >> > > > > >
> >> > > > > >> Hi Max,
> >> > > > > >>
> >> > > > > >> Thank you for the proposal. The proposal tackles a very
> >> important
> >> > > > issue
> >> > > > > >> for Flink users and the design looks promising overall!
> >> > > > > >>
> >> > > > > >> I have some questions to better understand the proposed
> public
> >> > > > > interfaces
> >> > > > > >> and the algorithm.
> >> > > > > >>
> >> > > > > >> 1) The proposal seems to assume that the operator's
> >> > > > busyTimeMsPerSecond
> >> > > > > >> could reach 1 sec. I believe this is mostly true for
> cpu-bound
> >> > > > > operators.
> >> > > > > >> Could you confirm that this can also be true for io-bound
> >> > operators
> >> > > > > such as
> >> > > > > >> sinks? For example, suppose a Kafka Sink subtask has reached
> >> I/O
> >> > > > > bottleneck
> >> > > > > >> when flushing data out to the Kafka clusters, will
> >> > > busyTimeMsPerSecond
> >> > > > > >> reach 1 sec?
> >> > > > > >>
> >> > > > > >> 2) It is said that "users can configure a maximum time to
> fully
> >> > > > process
> >> > > > > >> the backlog". The configuration section does not seem to
> >> provide
> >> > > this
> >> > > > > >> config. Could you specify this? And any chance this proposal
> >> can
> >> > > > provide
> >> > > > > >> the formula for calculating the new processing rate?
> >> > > > > >>
> >> > > > > >> 3) How are users expected to specify the per-operator configs
> >> > (e.g.
> >> > > > > >> target utilization)? For example, should users specify it
> >> > > > > programmatically
> >> > > > > >> in a DataStream/Table/SQL API?
> >> > > > > >>
> >> > > > > >> 4) How often will the Flink Kubernetes operator query metrics
> >> from
> >> > > > > >> JobManager? Is this configurable?
> >> > > > > >>
> >> > > > > >> 5) Could you specify the config name and default value for
> the
> >> > > > proposed
> >> > > > > >> configs?
> >> > > > > >>
> >> > > > > >> 6) Could you add the name/mbean/type for the proposed
> metrics?
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> Cheers,
> >> > > > > >> Dong
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to