Hi Dong!

Could you please confirm that your main concerns have been addressed?

Some other minor details that might not have been fully clarified:
 - The prototype has been validated on some production workloads yes
 - We are only planning to use metrics that are generally available and are
previously accepted to be standardized connector metrics (not Kafka
specific). This is actually specified in the FLIP
 - Even if some metrics (such as pendingRecords) are not accessible the
scaling algorithm works and can be used. For source scaling based on
utilization alone we still need some trivial modifications on the
implementation side.

Cheers,
Gyula

On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Dong!
>
> This is not an experimental feature proposal. The implementation of the
> prototype is still in an experimental phase but by the time the FLIP,
> initial prototype and review is done, this should be in a good stable first
> version.
> This proposal is pretty general as autoscalers/tuners get as far as I
> understand and there is no history of any alternative effort that even
> comes close to the applicability of this solution.
>
> Any large features that were added to Flink in the past have gone through
> several iterations over the years and the APIs have evolved as they matured.
> Something like the autoscaler can only be successful if there is enough
> user exposure and feedback to make it good, putting it in an external repo
> will not get us anywhere.
>
> We have a prototype implementation ready that works well and it is more or
> less feature complete. We proposed this FLIP based on something that we see
> as a working solution, please do not underestimate the effort that went
> into this proposal and the validation of the ideas. So in this sense our
> approach here is the same as with the Table Store and Kubernetes Operator
> and other big components of the past. On the other hand it's impossible to
> sufficiently explain all the technical depth/implementation details of such
> complex components in FLIPs to 100%, I feel we have a good overview of the
> algorithm in the FLIP and the implementation should cover all remaining
> questions. We will have an extended code review phase following the FLIP
> vote before this make it into the project.
>
> I understand your concern regarding the stability of Flink Kubernetes
> Operator config and metric names. We have decided to not provide guarantees
> there yet but if you feel that it's time for the operator to support such
> guarantees please open a separate discussion on that topic, I don't want to
> mix the two problems here.
>
> Regards,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:07 PM Dong Lin <lindon...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> If I understand correctly, this autopilot proposal is an experimental
>> feature and its configs/metrics are not mature enough to provide backward
>> compatibility yet. And the proposal provides high-level ideas of the
>> algorithm but it is probably too complicated to explain it end-to-end.
>>
>> On the one hand, I do agree that having an auto-tuning prototype, even if
>> not mature, is better than nothing for Flink users. On the other hand, I
>> am
>> concerned that this FLIP seems a bit too experimental, and starting with
>> an
>> immature design might make it harder for us to reach a production-ready
>> and
>> generally applicable auto-tuner in the future. And introducing too
>> backward
>> incompatible changes generally hurts users' trust in the Flink project.
>>
>> One alternative might be to develop and experiment with this feature in a
>> non-Flink repo. You can iterate fast without worrying about typically
>> backward compatibility requirement as required for most Flink public
>> features. And once the feature is reasonably evaluated and mature enough,
>> it will be much easier to explain the design and address all the issues
>> mentioned above. For example, Jingsong implemented a Flink Table Store
>> prototype
>> <https://github.com/JingsongLi/flink/tree/table_storage/flink-table>
>> before
>> proposing FLIP-188 in this thread
>> <https://lists.apache.org/thread/dlhspjpms007j2ynymsg44fxcx6fm064>.
>>
>> I don't intend to block your progress. Just my two cents. It will be great
>> to hear more from other developers (e.g. in the voting thread).
>>
>> Thanks,
>> Dong
>>
>>
>> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>> > Hi Dong,
>> >
>> > Let me address your comments.
>> >
>> > Time for scale / backlog processing time derivation:
>> > We can add some more details to the Flip but at this point the
>> > implementation is actually much simpler than the algorithm to describe
>> it.
>> > I would not like to add more equations etc because it just
>> overcomplicates
>> > something relatively simple in practice.
>> >
>> > In a nutshell: Time to recover  == lag / processing-rate-after-scaleup.
>> > It's fairly easy to see where this is going, but best to see in code.
>> >
>> > Using pendingRecords and alternative mechanisms:
>> > True that the current algorithm relies on pending records to effectively
>> > compute the target source processing rates and therefore scale sources.
>> > This is available for Kafka which is by far the most common streaming
>> > source and is used by the majority of streaming applications currently.
>> > It would be very easy to add alternative purely utilization based
>> scaling
>> > to the sources. We can start with the current proposal and add this
>> along
>> > the way before the first version.
>> >
>> > Metrics, Configs and Public API:
>> > The autoscaler feature is proposed for the Flink Kubernetes Operator
>> which
>> > does not have the same API/config maturity and thus does not provide the
>> > same guarantees.
>> > We currently support backward compatibilty for the CRD itself and not
>> the
>> > configs or metrics. This does not mean that we do not aim to do so but
>> at
>> > this stage we still have to clean up the details of the newly added
>> > components. In practice this means that if we manage to get the metrics
>> /
>> > configs right at the first try we will keep them and provide
>> compatibility,
>> > but if we feel that we missed something or we don't need something we
>> can
>> > still remove it. It's a more pragmatic approach for such a component
>> that
>> > is likely to evolve than setting everything in stone immediately.
>> >
>> > Cheers,
>> > Gyula
>> >
>> >
>> >
>> > On Wed, Nov 16, 2022 at 6:07 PM Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Thanks for the update! Please see comments inline.
>> > >
>> > > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels <m...@apache.org>
>> > > wrote:
>> > >
>> > > > Of course! Let me know if your concerns are addressed. The wiki page
>> > has
>> > > > been updated.
>> > > >
>> > > > >It will be great to add this in the FLIP so that reviewers can
>> > > understand
>> > > > how the source parallelisms are computed and how the algorithm works
>> > > > end-to-end.
>> > > >
>> > > > I've updated the FLIP page to add more details on how the
>> backlog-based
>> > > > scaling works (2).
>> > > >
>> > >
>> > > The algorithm is much more informative now.  The algorithm currently
>> uses
>> > > "Estimated time for rescale" to derive new source parallelism. Could
>> we
>> > > also specify in the FLIP how this value is derived?
>> > >
>> > > The algorithm currently uses pendingRecords to derive source
>> parallelism.
>> > > It is an optional metric and KafkaSource currently reports this
>> metric.
>> > So
>> > > it means that only the proposed algorithm currently only works when
>> all
>> > > sources of the job are KafkaSource, right?
>> > >
>> > > This issue considerably limits the applicability of this FLIP. Do you
>> > think
>> > > most (if not all) streaming source will report this metric?
>> > Alternatively,
>> > > any chance we can have a fallback solution to evaluate the source
>> > > parallelism based on e.g. cpu or idle ratio for cases where this
>> metric
>> > is
>> > > not available?
>> > >
>> > >
>> > > > >These metrics and configs are public API and need to be stable
>> across
>> > > > minor versions, could we document them before finalizing the FLIP?
>> > > >
>> > > > Metrics and config changes are not strictly part of the public API
>> but
>> > > > Gyula has added a section.
>> > > >
>> > >
>> > > Hmm... if metrics are not public API, then it might happen that we
>> change
>> > > the mbean path in a minor release and break users' monitoring tool.
>> > > Similarly, we might change configs in a minor release that break
>> user's
>> > job
>> > > behavior. We probably want to avoid these breaking changes in minor
>> > > releases.
>> > >
>> > > It is documented here
>> > > <
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> > > >
>> > > that
>> > > "Exposed monitoring information" and "Configuration settings" are
>> public
>> > > interfaces of the project.
>> > >
>> > > Maybe we should also specify the metric here so that users can safely
>> > setup
>> > > dashboards and tools to track how the autopilot is working, similar to
>> > how
>> > > metrics are documented in FLIP-33
>> > > <
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > > >
>> > > ?
>> > >
>> > >
>> > > > -Max
>> > > >
>> > > > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi Maximilian,
>> > > > >
>> > > > > It seems that the following comments from the previous discussions
>> > have
>> > > > not
>> > > > > been addressed yet. Any chance we can have them addressed before
>> > > starting
>> > > > > the voting thread?
>> > > > >
>> > > > > Thanks,
>> > > > > Dong
>> > > > >
>> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi Dong!
>> > > > > >
>> > > > > > Let me try to answer the questions :)
>> > > > > >
>> > > > > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the
>> > time
>> > > > > > spent in the main record processing loop for an operator if I
>> > > > > > understand correctly. This includes IO operations too.
>> > > > > >
>> > > > > > 2: We should add this to the FLIP I agree. It would be a
>> Duration
>> > > > config
>> > > > > > with the expected catch up time after rescaling (let's say 5
>> > > minutes).
>> > > > It
>> > > > > > could be computed based on the current data rate and the
>> calculated
>> > > max
>> > > > > > processing rate after the rescale.
>> > > > > >
>> > > > >
>> > > > > It will be great to add this in the FLIP so that reviewers can
>> > > understand
>> > > > > how the source parallelisms are computed and how the algorithm
>> works
>> > > > > end-to-end.
>> > > > >
>> > > > >
>> > > > > > 3: In the current proposal we don't have per operator configs.
>> > Target
>> > > > > > utilization would apply to all operators uniformly.
>> > > > > >
>> > > > > > 4: It should be configurable, yes.
>> > > > > >
>> > > > >
>> > > > > Since this config is a public API, could we update the FLIP
>> > accordingly
>> > > > to
>> > > > > provide this config?
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > 5,6: The names haven't been finalized but I think these are
>> minor
>> > > > > details.
>> > > > > > We could add concrete names to the FLIP :)
>> > > > > >
>> > > > >
>> > > > > These metrics and configs are public API and need to be stable
>> across
>> > > > minor
>> > > > > versions, could we document them before finalizing the FLIP?
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > Cheers,
>> > > > > > Gyula
>> > > > > >
>> > > > > >
>> > > > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > > > >
>> > > > > >> Hi Max,
>> > > > > >>
>> > > > > >> Thank you for the proposal. The proposal tackles a very
>> important
>> > > > issue
>> > > > > >> for Flink users and the design looks promising overall!
>> > > > > >>
>> > > > > >> I have some questions to better understand the proposed public
>> > > > > interfaces
>> > > > > >> and the algorithm.
>> > > > > >>
>> > > > > >> 1) The proposal seems to assume that the operator's
>> > > > busyTimeMsPerSecond
>> > > > > >> could reach 1 sec. I believe this is mostly true for cpu-bound
>> > > > > operators.
>> > > > > >> Could you confirm that this can also be true for io-bound
>> > operators
>> > > > > such as
>> > > > > >> sinks? For example, suppose a Kafka Sink subtask has reached
>> I/O
>> > > > > bottleneck
>> > > > > >> when flushing data out to the Kafka clusters, will
>> > > busyTimeMsPerSecond
>> > > > > >> reach 1 sec?
>> > > > > >>
>> > > > > >> 2) It is said that "users can configure a maximum time to fully
>> > > > process
>> > > > > >> the backlog". The configuration section does not seem to
>> provide
>> > > this
>> > > > > >> config. Could you specify this? And any chance this proposal
>> can
>> > > > provide
>> > > > > >> the formula for calculating the new processing rate?
>> > > > > >>
>> > > > > >> 3) How are users expected to specify the per-operator configs
>> > (e.g.
>> > > > > >> target utilization)? For example, should users specify it
>> > > > > programmatically
>> > > > > >> in a DataStream/Table/SQL API?
>> > > > > >>
>> > > > > >> 4) How often will the Flink Kubernetes operator query metrics
>> from
>> > > > > >> JobManager? Is this configurable?
>> > > > > >>
>> > > > > >> 5) Could you specify the config name and default value for the
>> > > > proposed
>> > > > > >> configs?
>> > > > > >>
>> > > > > >> 6) Could you add the name/mbean/type for the proposed metrics?
>> > > > > >>
>> > > > > >>
>> > > > > >> Cheers,
>> > > > > >> Dong
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to