Hi David,

Thanks for your reply.  I think your response totally make sense.  This
flip targets on declaring required resource to ResourceManager instead of
using  ResourceManager to add/remove TMs directly.

Best,
Xiangyu



David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 15:46写道:

> Hi everyone,
>
> @Shammon
>
> I'm not entirely sure what "config file" you're referring to. You can, of
> course, override the default parallelism in "flink-conf.yaml", but for
> sinks and sources, the parallelism needs to be tweaked on the connector
> level ("WITH" statement).
>
> This is something that should be achieved with tooling around Flink. We
> want to provide an API on the lowest level that generalizes well. Achieving
> what you're describing should be straightforward with this API.
>
> @Xiangyu
>
> Is it possible for this REST API to declare TM resources in the future?
>
>
> Would you like to add/remove TMs if you use an active Resource Manager?
> This would be out of the scope of this effort since it targets the
> scheduler component only (we make no assumptions about the used Resource
> Manager). Also, the AdaptiveScheduler is only intended to be used for
> Streaming.
>
>  And for streaming jobs, I'm wondering if there is any situation we need to
> > rescale the TM resources of a flink cluster at first and then the
> adaptive
> > scheduler will rescale the per-vertex ResourceProfiles accordingly.
> >
>
> We plan on adding support for the ResourceProfiles (dynamic slot
> allocation) as the next step. Again we won't make any assumptions about the
> used Resource Manager. In other words, this effort ends by declaring
> desired resources to the Resource Manager.
>
> Does that make sense?
>
> @Matthias
>
> We've done another pass on the proposed API and currently lean towards
> having an idempotent PUT API.
> - We don't care too much about multiple writers' scenarios in terms of who
> can write an authoritative payload; this is up to the user of the API to
> figure out
> - It's indeed tricky to achieve atomicity with PATCH API; switching to PUT
> API seems to do the trick
> - We won't allow partial "payloads" anymore, meaning you need to define
> requirements for all vertices in the JobGraph; This is completely fine for
> the programmatic workflows. For DEBUG / DEMO purposes, you can use the GET
> endpoint and tweak the response to avoid writing the whole payload by hand.
>
> WDYT?
>
>
> Best,
> D.
>
> On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu <xiangyu...@gmail.com>
> wrote:
>
> > Hi David,
> >
> > Thanks for creating this flip. I think this work it is very useful,
> > especially in autoscaling scenario.  I would like to share some questions
> > from my view.
> >
> > 1, Is it possible for this REST API to declare TM resources in the
> future?
> > I'm asking because we are building the autoscaling feature for Flink OLAP
> > Session Cluster in ByteDance. We need to rescale the cluster's resource
> on
> > TM level instead of Job level. It would be very helpful if we have a REST
> > API for out external Autoscaling service to use.
> >
> > 2, And for streaming jobs, I'm wondering if there is any situation we
> need
> > to rescale the TM resources of a flink cluster at first and then the
> > adaptive scheduler will rescale the per-vertex ResourceProfiles
> > accordingly.
> >
> > best.
> > Xiangyu
> >
> > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道:
> >
> > > Hi David
> > >
> > > Thanks for your answer.
> > >
> > > > Can you elaborate more about how you'd intend to use the endpoint? I
> > > think we can ultimately introduce a way of re-declaring "per-vertex
> > > defaults," but I'd like to understand the use case bit more first.
> > >
> > > For this issue, I mainly consider the consistency of user configuration
> > and
> > > job runtime. For sql jobs, users usually set specific parallelism for
> > > source and sink, and set a global parallelism for other operators.
> These
> > > config items are stored in a config file. For some high-priority jobs,
> > > users may want to manage them manually.
> > > 1. When users need to scale the parallelism, they should update the
> > config
> > > file and restart flink job, which may take a long time.
> > > 2. After providing the REST API, users can just send a request to the
> job
> > > via REST API quickly after updating the config file.
> > > The configuration in the running job and config file should be the
> same.
> > > What do you think of this?
> > >
> > > best.
> > > Shammon
> > >
> > >
> > >
> > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek <david.mora...@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Let's try to answer the questions one by one.
> > > >
> > > > *@ConradJam*
> > > >
> > > > when the number of "slots" is insufficient, can we can stop users
> > > rescaling
> > > > > or throw something to tell user "less avaliable slots to upgrade,
> > > please
> > > > > checkout your alivalbe slots" ?
> > > > >
> > > >
> > > > The main property of AdaptiveScheduler is that it can adapt to
> > "available
> > > > resources," which means you're still able to make progress even
> though
> > > you
> > > > didn't get all the slots you've asked for. Let's break down the pros
> > and
> > > > cons of this property.
> > > >
> > > > - (plus) If you lose a TM for some reason, you can still recover even
> > if
> > > it
> > > > doesn't come back. We still need to give it some time to eliminate
> > > > unnecessary rescaling, which can be controlled by setting
> > > > "resource-stabilization-timeout."
> > > > - (plus) The resources can arrive with a significant delay. For
> > example,
> > > > you're unable to spawn enough TMs on time because you've run out of
> > > > resources in your k8s cluster, and you need to wait for the cluster
> > auto
> > > > scaler to kick in and add new nodes to the cluster. In this scenario,
> > > > you'll be able to start making progress faster, at the cost of
> multiple
> > > > rescalings (once the remaining resources arrive).
> > > > - (plus) This plays well with the declarative manner of today's
> > > > infrastructure. For example, you tell k8s that you need 10 TMs, and
> > > you'll
> > > > eventually get them.
> > > > - (minus) In the case of large state jobs, the cost of multiple
> > > rescalings
> > > > might outweigh the above.
> > > >
> > > > We've already touched on the solution to this problem on the FLIP.
> > Please
> > > > notice the parallelism knob being a range with a lower and upper
> bound.
> > > > Setting both the lower and upper bound to the same value could give
> the
> > > > behavior you're describing at the cost of giving up some properties
> > that
> > > AS
> > > > gives you (you'd be falling back to the DefaultScheduler's behavior).
> > > >
> > > > when user upgrade job-vertx-parallelism . I want to have an interface
> > to
> > > > > query the current update parallel execution status, so that the
> user
> > or
> > > > > program can understand the current status
> > > > >
> > > >
> > > > This is a misunderstanding. We're not introducing the RESCALE
> endpoint.
> > > > This endpoint allows you to re-declare the resources needed to run
> the
> > > job.
> > > > Once you reach the desired resources (you get more resources than the
> > > lower
> > > > bound defines), your job will run.
> > > >
> > > > We can expose a similar endpoint to "resource requirements" to give
> you
> > > an
> > > > overview of the resources the vertices already have. You can already
> > get
> > > > this from the REST API, so exposing this in yet another way should be
> > > > considered carefully.
> > > >
> > > > *@Matthias*
> > > >
> > > > I'm wondering whether it makes sense to add some kind of resource ID
> to
> > > the
> > > > > REST API.
> > > >
> > > >
> > > > That's a good question. I want to think about that and get back to
> the
> > > > question later. My main struggle when thinking about this is, "if
> this
> > > > would be an idempotent POST endpoint," would it be any different?
> > > >
> > > > How often do we allow resource requirements to be changed?
> > > >
> > > >
> > > > There shall be no rate limiting on the FLINK side. If this is
> something
> > > > your environment needs, you can achieve it on a different layer ("we
> > > can't
> > > > have FLINK to do everything").
> > > >
> > > > Versioning the JobGraph in the JobGraphStore rather than overwriting
> it
> > > > > might be an idea.
> > > > >
> > > >
> > > > This sounds interesting since it would be closer to the JobGraph
> being
> > > > immutable. The main problem I see here is that this would introduce a
> > > > BW-incompatible change so it might be a topic for follow-up FLIP.
> > > >
> > > > I'm just wondering whether we bundle two things together that are
> > > actually
> > > > > separate
> > > > >
> > > >
> > > > Yup, this is how we think about it as well. The main question is,
> "who
> > > > should be responsible for bookkeeping 1) the JobGraph and 2) the
> > > > JobResourceRequirements". The JobMaster would be the right place for
> > > both,
> > > > but it's currently not the case, and we're tightly coupling the
> > > dispatcher
> > > > with the JobMaster.
> > > >
> > > > Initially, we tried to introduce a separate HA component in JobMaster
> > for
> > > > bookkeeping the JobResourceRequirements, but that proved to be a more
> > > > significant effort adding additional mess to the already messy HA
> > > > ecosystem. Another approach we've discussed was mutating the JobGraph
> > and
> > > > setting JRR into the JobGraph structure itself.
> > > >
> > > > The middle ground for keeping this effort reasonably sized and not
> > > > violating "we want to keep JG immutable" too much is keeping the
> > > > JobResourceRequirements separate as an internal config option in
> > > JobGraph's
> > > > configuration.
> > > >
> > > > We ultimately need to rethink the tight coupling of Dispatcher and
> > > > JobMaster, but it needs to be a separate effort.
> > > >
> > > > ...also considering the amount of data that can be stored in a
> > > > > ConfigMap/ZooKeeper node if versioning the resource requirement
> > change
> > > as
> > > > > proposed in my previous item is an option for us.
> > > > >
> > > >
> > > > AFAIK we're only storing pointers to the S3 objects in HA metadata,
> so
> > we
> > > > should be okay with having larger structures for now.
> > > >
> > > > Updating the JobGraphStore means adding more requests to the HA
> backend
> > > > API.
> > > > >
> > > >
> > > > It's fine unless you intend to override the resource requirements a
> few
> > > > times per second.
> > > >
> > > > *@Shammon*
> > > >
> > > > How about adding some more information such as vertex type
> > > > >
> > > >
> > > > Since it was intended as a "debug" endpoint, it makes complete sense!
> > > >
> > > >  For sql jobs, we always use a unified parallelism for most vertices.
> > Can
> > > > > we provide them with a more convenient setting method instead of
> each
> > > > one?
> > > >
> > > >
> > > > I completely feel with this. The main thoughts when designing the API
> > > were:
> > > > - We want to keep it clean and easy to understand.
> > > > - Global parallelism can be modeled using per-vertex parallelism but
> > not
> > > > the other way around.
> > > > - The API will be used by external tooling (operator, auto scaler).
> > > >
> > > > Can you elaborate more about how you'd intend to use the endpoint? I
> > > think
> > > > we can ultimately introduce a way of re-declaring "per-vertex
> > defaults,"
> > > > but I'd like to understand the use case bit more first.
> > > >
> > > > *@Weijie*
> > > >
> > > > What is the default value here (based on what configuration), or just
> > > > > infinite?
> > > > >
> > > >
> > > > Currently, for the lower bound, it's always one, and for the upper
> > bound,
> > > > it's either parallelism (if defined) or the maxParallelism of the
> > vertex
> > > in
> > > > JobGraph. This question might be another signal for making the
> defaults
> > > > explicit (see the answer to Shammon's question above).
> > > >
> > > >
> > > > Thanks, everyone, for your initial thoughts!
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo <guoweijieres...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Thanks David for driving this. This is a very valuable work,
> > especially
> > > > for
> > > > > cloud native environment.
> > > > >
> > > > > >> How about adding some more information such as vertex type
> > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> > > > > resource-requirements`? For users, only vertex-id may be difficult
> to
> > > > > understand.
> > > > >
> > > > > +1 for this suggestion, including jobvertex's name in the response
> > body
> > > > is
> > > > > more
> > > > > user-friendly.
> > > > >
> > > > >
> > > > > I saw this sentence in FLIP: "Setting the upper bound to -1 will
> > reset
> > > > the
> > > > > value to the default setting."  What is the default value here
> (based
> > > on
> > > > > what configuration), or just infinite?
> > > > >
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > >
> > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 18:06写道:
> > > > >
> > > > > > Hi David
> > > > > >
> > > > > > Thanks for initiating this discussion. I think declaring job
> > resource
> > > > > > requirements by REST API is very valuable. I just left some
> > comments
> > > as
> > > > > > followed
> > > > > >
> > > > > > 1) How about adding some more information such as vertex type
> > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> > > > > > resource-requirements`? For users, only vertex-id may be
> difficult
> > to
> > > > > > understand.
> > > > > >
> > > > > > 2) For sql jobs, we always use a unified parallelism for most
> > > vertices.
> > > > > Can
> > > > > > we provide them with a more convenient setting method instead of
> > each
> > > > > one?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Shammon
> > > > > >
> > > > > >
> > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl <
> > matthias.p...@aiven.io
> > > > > > .invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks David for creating this FLIP. It sounds promising and
> > useful
> > > > to
> > > > > > > have. Here are some thoughts from my side (some of them might
> be
> > > > > rather a
> > > > > > > follow-up and not necessarily part of this FLIP):
> > > > > > > - I'm wondering whether it makes sense to add some kind of
> > resource
> > > > ID
> > > > > to
> > > > > > > the REST API. This would give Flink a tool to verify the PATCH
> > > > request
> > > > > of
> > > > > > > the external system in a compare-and-set kind of manner. AFAIU,
> > the
> > > > > > process
> > > > > > > requires the external system to retrieve the resource
> > requirements
> > > > > first
> > > > > > > (to retrieve the vertex IDs). A resource ID <ABC> would be sent
> > > along
> > > > > as
> > > > > > a
> > > > > > > unique identifier for the provided setup. It's essentially the
> > > > version
> > > > > ID
> > > > > > > of the currently deployed resource requirement configuration.
> > Flink
> > > > > > doesn't
> > > > > > > know whether the external system would use the provided
> > information
> > > > in
> > > > > > some
> > > > > > > way to derive a new set of resource requirements for this job.
> > The
> > > > > > > subsequent PATCH request with updated resource requirements
> would
> > > > > include
> > > > > > > the previously retrieved resource ID <ABC>. The PATCH call
> would
> > > fail
> > > > > if
> > > > > > > there was a concurrent PATCH call in between indicating to the
> > > > external
> > > > > > > system that the resource requirements were concurrently
> updated.
> > > > > > > - How often do we allow resource requirements to be changed?
> That
> > > > > > question
> > > > > > > might make my previous comment on the resource ID obsolete
> > because
> > > we
> > > > > > could
> > > > > > > just make any PATCH call fail if there was a resource
> requirement
> > > > > update
> > > > > > > within a certain time frame before the request. But such a time
> > > > period
> > > > > is
> > > > > > > something we might want to make configurable then, I guess.
> > > > > > > - Versioning the JobGraph in the JobGraphStore rather than
> > > > overwriting
> > > > > it
> > > > > > > might be an idea. This would enable us to provide resource
> > > > requirement
> > > > > > > changes in the UI or through the REST API. It is related to a
> > > problem
> > > > > > > around keeping track of the exception history within the
> > > > > > AdaptiveScheduler
> > > > > > > and also having to consider multiple versions of a JobGraph.
> But
> > > for
> > > > > that
> > > > > > > one, we use the ExecutionGraphInfoStore right now.
> > > > > > > - Updating the JobGraph in the JobGraphStore makes sense. I'm
> > just
> > > > > > > wondering whether we bundle two things together that are
> actually
> > > > > > separate:
> > > > > > > The business logic and the execution configuration (the
> resource
> > > > > > > requirements). I'm aware that this is not a flaw of the current
> > > FLIP
> > > > > but
> > > > > > > rather something that was not necessary to address in the past
> > > > because
> > > > > > the
> > > > > > > JobGraph was kind of static. I don't remember whether that was
> > > > already
> > > > > > > discussed while working on the AdaptiveScheduler for FLIP-160
> > [1].
> > > > > Maybe,
> > > > > > > I'm missing some functionality here that requires us to have
> > > > everything
> > > > > > in
> > > > > > > one place. But it feels like updating the entire JobGraph which
> > > could
> > > > > be
> > > > > > > actually a "config change" is not reasonable. ...also
> considering
> > > the
> > > > > > > amount of data that can be stored in a ConfigMap/ZooKeeper node
> > if
> > > > > > > versioning the resource requirement change as proposed in my
> > > previous
> > > > > > item
> > > > > > > is an option for us.
> > > > > > > - Updating the JobGraphStore means adding more requests to the
> HA
> > > > > backend
> > > > > > > API. There were some concerns shared in the discussion thread
> [2]
> > > for
> > > > > > > FLIP-270 [3] on pressuring the k8s API server in the past with
> > too
> > > > many
> > > > > > > calls. Eventhough, it's more likely to be caused by
> > checkpointing,
> > > I
> > > > > > still
> > > > > > > wanted to bring it up. We're working on a standardized
> > performance
> > > > test
> > > > > > to
> > > > > > > prepare going forward with FLIP-270 [3] right now.
> > > > > > >
> > > > > > > Best,
> > > > > > > Matthias
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> > > > > > > [2]
> > > https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
> > > > > > > [3]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> > > > > > >
> > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam <jam.gz...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi David:
> > > > > > > >
> > > > > > > > Thank you for drive this flip, which helps less flink
> shutdown
> > > time
> > > > > > > >
> > > > > > > > for this flip, I would like to make a few idea on share
> > > > > > > >
> > > > > > > >
> > > > > > > >    - when the number of "slots" is insufficient, can we can
> > stop
> > > > > users
> > > > > > > >    rescaling or throw something to tell user "less avaliable
> > > slots
> > > > to
> > > > > > > > upgrade,
> > > > > > > >    please checkout your alivalbe slots" ? Or we could have a
> > > > request
> > > > > > > >    switch(true/false) to allow this behavior
> > > > > > > >
> > > > > > > >
> > > > > > > >    - when user upgrade job-vertx-parallelism . I want to have
> > an
> > > > > > > interface
> > > > > > > >    to query the current update parallel execution status, so
> > that
> > > > the
> > > > > > > user
> > > > > > > > or
> > > > > > > >    program can understand the current status
> > > > > > > >    - I want to have an interface to query the current update
> > > > > > parallelism
> > > > > > > >    execution status. This also helps similar to *[1] Flink
> K8S
> > > > > > Operator*
> > > > > > > >    management
> > > > > > > >
> > > > > > > >
> > > > > > > > {
> > > > > > > >   status: Failed
> > > > > > > >   reason: "less avaliable slots to upgrade, please checkout
> > your
> > > > > > alivalbe
> > > > > > > > slots"
> > > > > > > > }
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >    - *Pending*: this job now is join the upgrade queue,it
> will
> > be
> > > > > > update
> > > > > > > >    later
> > > > > > > >    - *Rescaling*: job now is rescaling,wait it finish
> > > > > > > >    - *Finished*: finish do it
> > > > > > > >    - *Failed* : something have wrong,so this job is not
> > alivable
> > > > > > upgrade
> > > > > > > >
> > > > > > > > I want to supplement my above content in flip, what do you
> > think
> > > ?
> > > > > > > >
> > > > > > > >
> > > > > > > >    1.
> > > > > > > >
> > > > > > >
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> > > > > > > >
> > > > > > > >
> > > > > > > > David Morávek <d...@apache.org> 于2023年2月3日周五 16:42写道:
> > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > This FLIP [1] introduces a new REST API for declaring
> > resource
> > > > > > > > requirements
> > > > > > > > > for the Adaptive Scheduler. There seems to be a clear need
> > for
> > > > this
> > > > > > API
> > > > > > > > > based on the discussion on the "Reworking the Rescale API"
> > [2]
> > > > > > thread.
> > > > > > > > >
> > > > > > > > > Before we get started, this work is heavily based on the
> > > > prototype
> > > > > > [3]
> > > > > > > > > created by Till Rohrmann, and the FLIP is being published
> > with
> > > > his
> > > > > > > > consent.
> > > > > > > > > Big shoutout to him!
> > > > > > > > >
> > > > > > > > > Last and not least, thanks to Chesnay and Roman for the
> > initial
> > > > > > reviews
> > > > > > > > and
> > > > > > > > > discussions.
> > > > > > > > >
> > > > > > > > > The best start would be watching a short demo [4] that I've
> > > > > recorded,
> > > > > > > > which
> > > > > > > > > illustrates newly added capabilities (rescaling the running
> > > job,
> > > > > > > handing
> > > > > > > > > back resources to the RM, and session cluster support).
> > > > > > > > >
> > > > > > > > > The intuition behind the FLIP is being able to define
> > resource
> > > > > > > > requirements
> > > > > > > > > ("resource boundaries") externally that the
> AdaptiveScheduler
> > > can
> > > > > > > > navigate
> > > > > > > > > within. This is a building block for higher-level efforts
> > such
> > > as
> > > > > an
> > > > > > > > > external Autoscaler. The natural extension of this work
> would
> > > be
> > > > to
> > > > > > > allow
> > > > > > > > > to specify per-vertex ResourceProfiles.
> > > > > > > > >
> > > > > > > > > Looking forward to your thoughts; any feedback is
> > appreciated!
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> > > > > > > > > [2]
> > > > > https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
> > > > > > > > > [3] https://github.com/tillrohrmann/flink/tree/autoscaling
> > > > > > > > > [4]
> > > > > > > >
> > > > >
> > https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > D.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Best
> > > > > > > >
> > > > > > > > ConradJam
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to