Thanks for your clarifications, David. I don't have any additional major
points to add. One thing about the FLIP: The RPC layer API for updating the
JRR returns a future with a JRR? I don't see value in returning a JRR here
since it's an idempotent operation? Wouldn't it be enough to return
CompletableFuture<Void> here? Or am I missing something?

Matthias

On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels <m...@apache.org> wrote:

> Thanks David! If we could get the pre-allocation working as part of
> the FLIP, that would be great.
>
> Concerning the downscale case, I agree this is a special case for the
> (single-job) application mode where we could re-allocate slots in a
> way that could leave entire task managers unoccupied which we would
> then be able to release. The goal essentially is to reduce slot
> fragmentation on scale down by packing the slots efficiently. The
> easiest way to add this optimization when running in application mode
> would be to drop as many task managers during the restart such that
> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look into
> this independently of the FLIP.
>
> Feel free to start the vote.
>
> -Max
>
> On Mon, Feb 20, 2023 at 9:10 AM David Morávek <d...@apache.org> wrote:
> >
> > Hi everyone,
> >
> > Thanks for the feedback! I've updated the FLIP to use idempotent PUT API
> instead of PATCH and to properly handle lower bound settings, to support
> the "pre-allocation" of the resources.
> >
> > @Max
> >
> > > How hard would it be to address this issue in the FLIP?
> >
> > I've included this in the FLIP. It might not be too hard to implement
> this in the end.
> >
> > > B) drop as many superfluous task managers as needed
> >
> > I've intentionally left this part out for now because this ultimately
> needs to be the responsibility of the Resource Manager. After all, in the
> Session Cluster scenario, the Scheduler doesn't have the bigger picture of
> other tasks of other jobs running on those TMs. This will most likely be a
> topic for another FLIP.
> >
> > WDYT? If there are no other questions or concerns, I'd like to start the
> vote on Wednesday.
> >
> > Best,
> > D.
> >
> > On Wed, Feb 15, 2023 at 3:34 PM Maximilian Michels <m...@apache.org>
> wrote:
> >>
> >> I missed that the FLIP states:
> >>
> >> > Currently, even though we’d expose the lower bound for clarity and
> API completeness, we won’t allow setting it to any other value than one
> until we have full support throughout the stack.
> >>
> >> How hard would it be to address this issue in the FLIP?
> >>
> >> There is not much value to offer setting a lower bound which won't be
> >> respected / throw an error when it is set. If we had support for a
> >> lower bound, we could enforce a resource contract externally via
> >> setting lowerBound == upperBound. That ties back to the Rescale API
> >> discussion we had. I want to better understand what the major concerns
> >> would be around allowing this.
> >>
> >> Just to outline how I imagine the logic to work:
> >>
> >> A) The resource constraints are already met => Nothing changes
> >> B) More resources available than required => Cancel the job, drop as
> >> many superfluous task managers as needed, restart the job
> >> C) Less resources available than required => Acquire new task
> >> managers, wait for them to register, cancel and restart the job
> >>
> >> I'm open to helping out with the implementation.
> >>
> >> -Max
> >>
> >> On Mon, Feb 13, 2023 at 7:45 PM Maximilian Michels <m...@apache.org>
> wrote:
> >> >
> >> > Based on further discussion I had with Chesnay on this PR [1], I think
> >> > jobs would currently go into a restarting state after the resource
> >> > requirements have changed. This wouldn't achieve what we had in mind,
> >> > i.e. sticking to the old resource requirements until enough slots are
> >> > available to fulfil the new resource requirements. So this may not be
> >> > 100% what we need but it could be extended to do what we want.
> >> >
> >> > -Max
> >> >
> >> > [1] https://github.com/apache/flink/pull/21908#discussion_r1104792362
> >> >
> >> > On Mon, Feb 13, 2023 at 7:16 PM Maximilian Michels <m...@apache.org>
> wrote:
> >> > >
> >> > > Hi David,
> >> > >
> >> > > This is awesome! Great writeup and demo. This is pretty much what we
> >> > > need for the autoscaler as part of the Flink Kubernetes operator
> [1].
> >> > > Scaling Flink jobs effectively is hard but fortunately we have
> solved
> >> > > the issue as part of the Flink Kubernetes operator. The only
> critical
> >> > > piece we are missing is a better way to execute scaling decisions,
> as
> >> > > discussed in [2].
> >> > >
> >> > > Looking at your proposal, we would set lowerBound == upperBound for
> >> > > the parallelism because we want to fully determine the parallelism
> >> > > externally based on the scaling metrics. Does that sound right?
> >> > >
> >> > > What is the timeline for these changes? Is there a JIRA?
> >> > >
> >> > > Cheers,
> >> > > Max
> >> > >
> >> > > [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
> >> > > [2]
> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
> >> > >
> >> > > On Mon, Feb 13, 2023 at 1:16 PM feng xiangyu <xiangyu...@gmail.com>
> wrote:
> >> > > >
> >> > > > Hi David,
> >> > > >
> >> > > > Thanks for your reply.  I think your response totally make
> sense.  This
> >> > > > flip targets on declaring required resource to ResourceManager
> instead of
> >> > > > using  ResourceManager to add/remove TMs directly.
> >> > > >
> >> > > > Best,
> >> > > > Xiangyu
> >> > > >
> >> > > >
> >> > > >
> >> > > > David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 15:46写道:
> >> > > >
> >> > > > > Hi everyone,
> >> > > > >
> >> > > > > @Shammon
> >> > > > >
> >> > > > > I'm not entirely sure what "config file" you're referring to.
> You can, of
> >> > > > > course, override the default parallelism in "flink-conf.yaml",
> but for
> >> > > > > sinks and sources, the parallelism needs to be tweaked on the
> connector
> >> > > > > level ("WITH" statement).
> >> > > > >
> >> > > > > This is something that should be achieved with tooling around
> Flink. We
> >> > > > > want to provide an API on the lowest level that generalizes
> well. Achieving
> >> > > > > what you're describing should be straightforward with this API.
> >> > > > >
> >> > > > > @Xiangyu
> >> > > > >
> >> > > > > Is it possible for this REST API to declare TM resources in the
> future?
> >> > > > >
> >> > > > >
> >> > > > > Would you like to add/remove TMs if you use an active Resource
> Manager?
> >> > > > > This would be out of the scope of this effort since it targets
> the
> >> > > > > scheduler component only (we make no assumptions about the used
> Resource
> >> > > > > Manager). Also, the AdaptiveScheduler is only intended to be
> used for
> >> > > > > Streaming.
> >> > > > >
> >> > > > >  And for streaming jobs, I'm wondering if there is any
> situation we need to
> >> > > > > > rescale the TM resources of a flink cluster at first and then
> the
> >> > > > > adaptive
> >> > > > > > scheduler will rescale the per-vertex ResourceProfiles
> accordingly.
> >> > > > > >
> >> > > > >
> >> > > > > We plan on adding support for the ResourceProfiles (dynamic slot
> >> > > > > allocation) as the next step. Again we won't make any
> assumptions about the
> >> > > > > used Resource Manager. In other words, this effort ends by
> declaring
> >> > > > > desired resources to the Resource Manager.
> >> > > > >
> >> > > > > Does that make sense?
> >> > > > >
> >> > > > > @Matthias
> >> > > > >
> >> > > > > We've done another pass on the proposed API and currently lean
> towards
> >> > > > > having an idempotent PUT API.
> >> > > > > - We don't care too much about multiple writers' scenarios in
> terms of who
> >> > > > > can write an authoritative payload; this is up to the user of
> the API to
> >> > > > > figure out
> >> > > > > - It's indeed tricky to achieve atomicity with PATCH API;
> switching to PUT
> >> > > > > API seems to do the trick
> >> > > > > - We won't allow partial "payloads" anymore, meaning you need
> to define
> >> > > > > requirements for all vertices in the JobGraph; This is
> completely fine for
> >> > > > > the programmatic workflows. For DEBUG / DEMO purposes, you can
> use the GET
> >> > > > > endpoint and tweak the response to avoid writing the whole
> payload by hand.
> >> > > > >
> >> > > > > WDYT?
> >> > > > >
> >> > > > >
> >> > > > > Best,
> >> > > > > D.
> >> > > > >
> >> > > > > On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu <
> xiangyu...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi David,
> >> > > > > >
> >> > > > > > Thanks for creating this flip. I think this work it is very
> useful,
> >> > > > > > especially in autoscaling scenario.  I would like to share
> some questions
> >> > > > > > from my view.
> >> > > > > >
> >> > > > > > 1, Is it possible for this REST API to declare TM resources
> in the
> >> > > > > future?
> >> > > > > > I'm asking because we are building the autoscaling feature
> for Flink OLAP
> >> > > > > > Session Cluster in ByteDance. We need to rescale the
> cluster's resource
> >> > > > > on
> >> > > > > > TM level instead of Job level. It would be very helpful if we
> have a REST
> >> > > > > > API for out external Autoscaling service to use.
> >> > > > > >
> >> > > > > > 2, And for streaming jobs, I'm wondering if there is any
> situation we
> >> > > > > need
> >> > > > > > to rescale the TM resources of a flink cluster at first and
> then the
> >> > > > > > adaptive scheduler will rescale the per-vertex
> ResourceProfiles
> >> > > > > > accordingly.
> >> > > > > >
> >> > > > > > best.
> >> > > > > > Xiangyu
> >> > > > > >
> >> > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道:
> >> > > > > >
> >> > > > > > > Hi David
> >> > > > > > >
> >> > > > > > > Thanks for your answer.
> >> > > > > > >
> >> > > > > > > > Can you elaborate more about how you'd intend to use the
> endpoint? I
> >> > > > > > > think we can ultimately introduce a way of re-declaring
> "per-vertex
> >> > > > > > > defaults," but I'd like to understand the use case bit more
> first.
> >> > > > > > >
> >> > > > > > > For this issue, I mainly consider the consistency of user
> configuration
> >> > > > > > and
> >> > > > > > > job runtime. For sql jobs, users usually set specific
> parallelism for
> >> > > > > > > source and sink, and set a global parallelism for other
> operators.
> >> > > > > These
> >> > > > > > > config items are stored in a config file. For some
> high-priority jobs,
> >> > > > > > > users may want to manage them manually.
> >> > > > > > > 1. When users need to scale the parallelism, they should
> update the
> >> > > > > > config
> >> > > > > > > file and restart flink job, which may take a long time.
> >> > > > > > > 2. After providing the REST API, users can just send a
> request to the
> >> > > > > job
> >> > > > > > > via REST API quickly after updating the config file.
> >> > > > > > > The configuration in the running job and config file should
> be the
> >> > > > > same.
> >> > > > > > > What do you think of this?
> >> > > > > > >
> >> > > > > > > best.
> >> > > > > > > Shammon
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek <
> david.mora...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi everyone,
> >> > > > > > > >
> >> > > > > > > > Let's try to answer the questions one by one.
> >> > > > > > > >
> >> > > > > > > > *@ConradJam*
> >> > > > > > > >
> >> > > > > > > > when the number of "slots" is insufficient, can we can
> stop users
> >> > > > > > > rescaling
> >> > > > > > > > > or throw something to tell user "less avaliable slots
> to upgrade,
> >> > > > > > > please
> >> > > > > > > > > checkout your alivalbe slots" ?
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > The main property of AdaptiveScheduler is that it can
> adapt to
> >> > > > > > "available
> >> > > > > > > > resources," which means you're still able to make
> progress even
> >> > > > > though
> >> > > > > > > you
> >> > > > > > > > didn't get all the slots you've asked for. Let's break
> down the pros
> >> > > > > > and
> >> > > > > > > > cons of this property.
> >> > > > > > > >
> >> > > > > > > > - (plus) If you lose a TM for some reason, you can still
> recover even
> >> > > > > > if
> >> > > > > > > it
> >> > > > > > > > doesn't come back. We still need to give it some time to
> eliminate
> >> > > > > > > > unnecessary rescaling, which can be controlled by setting
> >> > > > > > > > "resource-stabilization-timeout."
> >> > > > > > > > - (plus) The resources can arrive with a significant
> delay. For
> >> > > > > > example,
> >> > > > > > > > you're unable to spawn enough TMs on time because you've
> run out of
> >> > > > > > > > resources in your k8s cluster, and you need to wait for
> the cluster
> >> > > > > > auto
> >> > > > > > > > scaler to kick in and add new nodes to the cluster. In
> this scenario,
> >> > > > > > > > you'll be able to start making progress faster, at the
> cost of
> >> > > > > multiple
> >> > > > > > > > rescalings (once the remaining resources arrive).
> >> > > > > > > > - (plus) This plays well with the declarative manner of
> today's
> >> > > > > > > > infrastructure. For example, you tell k8s that you need
> 10 TMs, and
> >> > > > > > > you'll
> >> > > > > > > > eventually get them.
> >> > > > > > > > - (minus) In the case of large state jobs, the cost of
> multiple
> >> > > > > > > rescalings
> >> > > > > > > > might outweigh the above.
> >> > > > > > > >
> >> > > > > > > > We've already touched on the solution to this problem on
> the FLIP.
> >> > > > > > Please
> >> > > > > > > > notice the parallelism knob being a range with a lower
> and upper
> >> > > > > bound.
> >> > > > > > > > Setting both the lower and upper bound to the same value
> could give
> >> > > > > the
> >> > > > > > > > behavior you're describing at the cost of giving up some
> properties
> >> > > > > > that
> >> > > > > > > AS
> >> > > > > > > > gives you (you'd be falling back to the
> DefaultScheduler's behavior).
> >> > > > > > > >
> >> > > > > > > > when user upgrade job-vertx-parallelism . I want to have
> an interface
> >> > > > > > to
> >> > > > > > > > > query the current update parallel execution status, so
> that the
> >> > > > > user
> >> > > > > > or
> >> > > > > > > > > program can understand the current status
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > This is a misunderstanding. We're not introducing the
> RESCALE
> >> > > > > endpoint.
> >> > > > > > > > This endpoint allows you to re-declare the resources
> needed to run
> >> > > > > the
> >> > > > > > > job.
> >> > > > > > > > Once you reach the desired resources (you get more
> resources than the
> >> > > > > > > lower
> >> > > > > > > > bound defines), your job will run.
> >> > > > > > > >
> >> > > > > > > > We can expose a similar endpoint to "resource
> requirements" to give
> >> > > > > you
> >> > > > > > > an
> >> > > > > > > > overview of the resources the vertices already have. You
> can already
> >> > > > > > get
> >> > > > > > > > this from the REST API, so exposing this in yet another
> way should be
> >> > > > > > > > considered carefully.
> >> > > > > > > >
> >> > > > > > > > *@Matthias*
> >> > > > > > > >
> >> > > > > > > > I'm wondering whether it makes sense to add some kind of
> resource ID
> >> > > > > to
> >> > > > > > > the
> >> > > > > > > > > REST API.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > That's a good question. I want to think about that and
> get back to
> >> > > > > the
> >> > > > > > > > question later. My main struggle when thinking about this
> is, "if
> >> > > > > this
> >> > > > > > > > would be an idempotent POST endpoint," would it be any
> different?
> >> > > > > > > >
> >> > > > > > > > How often do we allow resource requirements to be changed?
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > There shall be no rate limiting on the FLINK side. If
> this is
> >> > > > > something
> >> > > > > > > > your environment needs, you can achieve it on a different
> layer ("we
> >> > > > > > > can't
> >> > > > > > > > have FLINK to do everything").
> >> > > > > > > >
> >> > > > > > > > Versioning the JobGraph in the JobGraphStore rather than
> overwriting
> >> > > > > it
> >> > > > > > > > > might be an idea.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > This sounds interesting since it would be closer to the
> JobGraph
> >> > > > > being
> >> > > > > > > > immutable. The main problem I see here is that this would
> introduce a
> >> > > > > > > > BW-incompatible change so it might be a topic for
> follow-up FLIP.
> >> > > > > > > >
> >> > > > > > > > I'm just wondering whether we bundle two things together
> that are
> >> > > > > > > actually
> >> > > > > > > > > separate
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Yup, this is how we think about it as well. The main
> question is,
> >> > > > > "who
> >> > > > > > > > should be responsible for bookkeeping 1) the JobGraph and
> 2) the
> >> > > > > > > > JobResourceRequirements". The JobMaster would be the
> right place for
> >> > > > > > > both,
> >> > > > > > > > but it's currently not the case, and we're tightly
> coupling the
> >> > > > > > > dispatcher
> >> > > > > > > > with the JobMaster.
> >> > > > > > > >
> >> > > > > > > > Initially, we tried to introduce a separate HA component
> in JobMaster
> >> > > > > > for
> >> > > > > > > > bookkeeping the JobResourceRequirements, but that proved
> to be a more
> >> > > > > > > > significant effort adding additional mess to the already
> messy HA
> >> > > > > > > > ecosystem. Another approach we've discussed was mutating
> the JobGraph
> >> > > > > > and
> >> > > > > > > > setting JRR into the JobGraph structure itself.
> >> > > > > > > >
> >> > > > > > > > The middle ground for keeping this effort reasonably
> sized and not
> >> > > > > > > > violating "we want to keep JG immutable" too much is
> keeping the
> >> > > > > > > > JobResourceRequirements separate as an internal config
> option in
> >> > > > > > > JobGraph's
> >> > > > > > > > configuration.
> >> > > > > > > >
> >> > > > > > > > We ultimately need to rethink the tight coupling of
> Dispatcher and
> >> > > > > > > > JobMaster, but it needs to be a separate effort.
> >> > > > > > > >
> >> > > > > > > > ...also considering the amount of data that can be stored
> in a
> >> > > > > > > > > ConfigMap/ZooKeeper node if versioning the resource
> requirement
> >> > > > > > change
> >> > > > > > > as
> >> > > > > > > > > proposed in my previous item is an option for us.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > AFAIK we're only storing pointers to the S3 objects in HA
> metadata,
> >> > > > > so
> >> > > > > > we
> >> > > > > > > > should be okay with having larger structures for now.
> >> > > > > > > >
> >> > > > > > > > Updating the JobGraphStore means adding more requests to
> the HA
> >> > > > > backend
> >> > > > > > > > API.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > It's fine unless you intend to override the resource
> requirements a
> >> > > > > few
> >> > > > > > > > times per second.
> >> > > > > > > >
> >> > > > > > > > *@Shammon*
> >> > > > > > > >
> >> > > > > > > > How about adding some more information such as vertex type
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Since it was intended as a "debug" endpoint, it makes
> complete sense!
> >> > > > > > > >
> >> > > > > > > >  For sql jobs, we always use a unified parallelism for
> most vertices.
> >> > > > > > Can
> >> > > > > > > > > we provide them with a more convenient setting method
> instead of
> >> > > > > each
> >> > > > > > > > one?
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > I completely feel with this. The main thoughts when
> designing the API
> >> > > > > > > were:
> >> > > > > > > > - We want to keep it clean and easy to understand.
> >> > > > > > > > - Global parallelism can be modeled using per-vertex
> parallelism but
> >> > > > > > not
> >> > > > > > > > the other way around.
> >> > > > > > > > - The API will be used by external tooling (operator,
> auto scaler).
> >> > > > > > > >
> >> > > > > > > > Can you elaborate more about how you'd intend to use the
> endpoint? I
> >> > > > > > > think
> >> > > > > > > > we can ultimately introduce a way of re-declaring
> "per-vertex
> >> > > > > > defaults,"
> >> > > > > > > > but I'd like to understand the use case bit more first.
> >> > > > > > > >
> >> > > > > > > > *@Weijie*
> >> > > > > > > >
> >> > > > > > > > What is the default value here (based on what
> configuration), or just
> >> > > > > > > > > infinite?
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Currently, for the lower bound, it's always one, and for
> the upper
> >> > > > > > bound,
> >> > > > > > > > it's either parallelism (if defined) or the
> maxParallelism of the
> >> > > > > > vertex
> >> > > > > > > in
> >> > > > > > > > JobGraph. This question might be another signal for
> making the
> >> > > > > defaults
> >> > > > > > > > explicit (see the answer to Shammon's question above).
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Thanks, everyone, for your initial thoughts!
> >> > > > > > > >
> >> > > > > > > > Best,
> >> > > > > > > > D.
> >> > > > > > > >
> >> > > > > > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo <
> guoweijieres...@gmail.com
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Thanks David for driving this. This is a very valuable
> work,
> >> > > > > > especially
> >> > > > > > > > for
> >> > > > > > > > > cloud native environment.
> >> > > > > > > > >
> >> > > > > > > > > >> How about adding some more information such as
> vertex type
> >> > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs
> >> > > > > > > > > resource-requirements`? For users, only vertex-id may
> be difficult
> >> > > > > to
> >> > > > > > > > > understand.
> >> > > > > > > > >
> >> > > > > > > > > +1 for this suggestion, including jobvertex's name in
> the response
> >> > > > > > body
> >> > > > > > > > is
> >> > > > > > > > > more
> >> > > > > > > > > user-friendly.
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > I saw this sentence in FLIP: "Setting the upper bound
> to -1 will
> >> > > > > > reset
> >> > > > > > > > the
> >> > > > > > > > > value to the default setting."  What is the default
> value here
> >> > > > > (based
> >> > > > > > > on
> >> > > > > > > > > what configuration), or just infinite?
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > Best regards,
> >> > > > > > > > >
> >> > > > > > > > > Weijie
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 18:06写道:
> >> > > > > > > > >
> >> > > > > > > > > > Hi David
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks for initiating this discussion. I think
> declaring job
> >> > > > > > resource
> >> > > > > > > > > > requirements by REST API is very valuable. I just
> left some
> >> > > > > > comments
> >> > > > > > > as
> >> > > > > > > > > > followed
> >> > > > > > > > > >
> >> > > > > > > > > > 1) How about adding some more information such as
> vertex type
> >> > > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get
> jobs
> >> > > > > > > > > > resource-requirements`? For users, only vertex-id may
> be
> >> > > > > difficult
> >> > > > > > to
> >> > > > > > > > > > understand.
> >> > > > > > > > > >
> >> > > > > > > > > > 2) For sql jobs, we always use a unified parallelism
> for most
> >> > > > > > > vertices.
> >> > > > > > > > > Can
> >> > > > > > > > > > we provide them with a more convenient setting method
> instead of
> >> > > > > > each
> >> > > > > > > > > one?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Best,
> >> > > > > > > > > > Shammon
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl <
> >> > > > > > matthias.p...@aiven.io
> >> > > > > > > > > > .invalid>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Thanks David for creating this FLIP. It sounds
> promising and
> >> > > > > > useful
> >> > > > > > > > to
> >> > > > > > > > > > > have. Here are some thoughts from my side (some of
> them might
> >> > > > > be
> >> > > > > > > > > rather a
> >> > > > > > > > > > > follow-up and not necessarily part of this FLIP):
> >> > > > > > > > > > > - I'm wondering whether it makes sense to add some
> kind of
> >> > > > > > resource
> >> > > > > > > > ID
> >> > > > > > > > > to
> >> > > > > > > > > > > the REST API. This would give Flink a tool to
> verify the PATCH
> >> > > > > > > > request
> >> > > > > > > > > of
> >> > > > > > > > > > > the external system in a compare-and-set kind of
> manner. AFAIU,
> >> > > > > > the
> >> > > > > > > > > > process
> >> > > > > > > > > > > requires the external system to retrieve the
> resource
> >> > > > > > requirements
> >> > > > > > > > > first
> >> > > > > > > > > > > (to retrieve the vertex IDs). A resource ID <ABC>
> would be sent
> >> > > > > > > along
> >> > > > > > > > > as
> >> > > > > > > > > > a
> >> > > > > > > > > > > unique identifier for the provided setup. It's
> essentially the
> >> > > > > > > > version
> >> > > > > > > > > ID
> >> > > > > > > > > > > of the currently deployed resource requirement
> configuration.
> >> > > > > > Flink
> >> > > > > > > > > > doesn't
> >> > > > > > > > > > > know whether the external system would use the
> provided
> >> > > > > > information
> >> > > > > > > > in
> >> > > > > > > > > > some
> >> > > > > > > > > > > way to derive a new set of resource requirements
> for this job.
> >> > > > > > The
> >> > > > > > > > > > > subsequent PATCH request with updated resource
> requirements
> >> > > > > would
> >> > > > > > > > > include
> >> > > > > > > > > > > the previously retrieved resource ID <ABC>. The
> PATCH call
> >> > > > > would
> >> > > > > > > fail
> >> > > > > > > > > if
> >> > > > > > > > > > > there was a concurrent PATCH call in between
> indicating to the
> >> > > > > > > > external
> >> > > > > > > > > > > system that the resource requirements were
> concurrently
> >> > > > > updated.
> >> > > > > > > > > > > - How often do we allow resource requirements to be
> changed?
> >> > > > > That
> >> > > > > > > > > > question
> >> > > > > > > > > > > might make my previous comment on the resource ID
> obsolete
> >> > > > > > because
> >> > > > > > > we
> >> > > > > > > > > > could
> >> > > > > > > > > > > just make any PATCH call fail if there was a
> resource
> >> > > > > requirement
> >> > > > > > > > > update
> >> > > > > > > > > > > within a certain time frame before the request. But
> such a time
> >> > > > > > > > period
> >> > > > > > > > > is
> >> > > > > > > > > > > something we might want to make configurable then,
> I guess.
> >> > > > > > > > > > > - Versioning the JobGraph in the JobGraphStore
> rather than
> >> > > > > > > > overwriting
> >> > > > > > > > > it
> >> > > > > > > > > > > might be an idea. This would enable us to provide
> resource
> >> > > > > > > > requirement
> >> > > > > > > > > > > changes in the UI or through the REST API. It is
> related to a
> >> > > > > > > problem
> >> > > > > > > > > > > around keeping track of the exception history
> within the
> >> > > > > > > > > > AdaptiveScheduler
> >> > > > > > > > > > > and also having to consider multiple versions of a
> JobGraph.
> >> > > > > But
> >> > > > > > > for
> >> > > > > > > > > that
> >> > > > > > > > > > > one, we use the ExecutionGraphInfoStore right now.
> >> > > > > > > > > > > - Updating the JobGraph in the JobGraphStore makes
> sense. I'm
> >> > > > > > just
> >> > > > > > > > > > > wondering whether we bundle two things together
> that are
> >> > > > > actually
> >> > > > > > > > > > separate:
> >> > > > > > > > > > > The business logic and the execution configuration
> (the
> >> > > > > resource
> >> > > > > > > > > > > requirements). I'm aware that this is not a flaw of
> the current
> >> > > > > > > FLIP
> >> > > > > > > > > but
> >> > > > > > > > > > > rather something that was not necessary to address
> in the past
> >> > > > > > > > because
> >> > > > > > > > > > the
> >> > > > > > > > > > > JobGraph was kind of static. I don't remember
> whether that was
> >> > > > > > > > already
> >> > > > > > > > > > > discussed while working on the AdaptiveScheduler
> for FLIP-160
> >> > > > > > [1].
> >> > > > > > > > > Maybe,
> >> > > > > > > > > > > I'm missing some functionality here that requires
> us to have
> >> > > > > > > > everything
> >> > > > > > > > > > in
> >> > > > > > > > > > > one place. But it feels like updating the entire
> JobGraph which
> >> > > > > > > could
> >> > > > > > > > > be
> >> > > > > > > > > > > actually a "config change" is not reasonable.
> ...also
> >> > > > > considering
> >> > > > > > > the
> >> > > > > > > > > > > amount of data that can be stored in a
> ConfigMap/ZooKeeper node
> >> > > > > > if
> >> > > > > > > > > > > versioning the resource requirement change as
> proposed in my
> >> > > > > > > previous
> >> > > > > > > > > > item
> >> > > > > > > > > > > is an option for us.
> >> > > > > > > > > > > - Updating the JobGraphStore means adding more
> requests to the
> >> > > > > HA
> >> > > > > > > > > backend
> >> > > > > > > > > > > API. There were some concerns shared in the
> discussion thread
> >> > > > > [2]
> >> > > > > > > for
> >> > > > > > > > > > > FLIP-270 [3] on pressuring the k8s API server in
> the past with
> >> > > > > > too
> >> > > > > > > > many
> >> > > > > > > > > > > calls. Eventhough, it's more likely to be caused by
> >> > > > > > checkpointing,
> >> > > > > > > I
> >> > > > > > > > > > still
> >> > > > > > > > > > > wanted to bring it up. We're working on a
> standardized
> >> > > > > > performance
> >> > > > > > > > test
> >> > > > > > > > > > to
> >> > > > > > > > > > > prepare going forward with FLIP-270 [3] right now.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best,
> >> > > > > > > > > > > Matthias
> >> > > > > > > > > > >
> >> > > > > > > > > > > [1]
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> >> > > > > > > > > > > [2]
> >> > > > > > >
> https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj
> >> > > > > > > > > > > [3]
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam <
> jam.gz...@gmail.com
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Hi David:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Thank you for drive this flip, which helps less
> flink
> >> > > > > shutdown
> >> > > > > > > time
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > for this flip, I would like to make a few idea on
> share
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - when the number of "slots" is insufficient,
> can we can
> >> > > > > > stop
> >> > > > > > > > > users
> >> > > > > > > > > > > >    rescaling or throw something to tell user
> "less avaliable
> >> > > > > > > slots
> >> > > > > > > > to
> >> > > > > > > > > > > > upgrade,
> >> > > > > > > > > > > >    please checkout your alivalbe slots" ? Or we
> could have a
> >> > > > > > > > request
> >> > > > > > > > > > > >    switch(true/false) to allow this behavior
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - when user upgrade job-vertx-parallelism . I
> want to have
> >> > > > > > an
> >> > > > > > > > > > > interface
> >> > > > > > > > > > > >    to query the current update parallel execution
> status, so
> >> > > > > > that
> >> > > > > > > > the
> >> > > > > > > > > > > user
> >> > > > > > > > > > > > or
> >> > > > > > > > > > > >    program can understand the current status
> >> > > > > > > > > > > >    - I want to have an interface to query the
> current update
> >> > > > > > > > > > parallelism
> >> > > > > > > > > > > >    execution status. This also helps similar to
> *[1] Flink
> >> > > > > K8S
> >> > > > > > > > > > Operator*
> >> > > > > > > > > > > >    management
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > {
> >> > > > > > > > > > > >   status: Failed
> >> > > > > > > > > > > >   reason: "less avaliable slots to upgrade,
> please checkout
> >> > > > > > your
> >> > > > > > > > > > alivalbe
> >> > > > > > > > > > > > slots"
> >> > > > > > > > > > > > }
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - *Pending*: this job now is join the upgrade
> queue,it
> >> > > > > will
> >> > > > > > be
> >> > > > > > > > > > update
> >> > > > > > > > > > > >    later
> >> > > > > > > > > > > >    - *Rescaling*: job now is rescaling,wait it
> finish
> >> > > > > > > > > > > >    - *Finished*: finish do it
> >> > > > > > > > > > > >    - *Failed* : something have wrong,so this job
> is not
> >> > > > > > alivable
> >> > > > > > > > > > upgrade
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I want to supplement my above content in flip,
> what do you
> >> > > > > > think
> >> > > > > > > ?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    1.
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > >
> >> > > > >
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > David Morávek <d...@apache.org> 于2023年2月3日周五
> 16:42写道:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Hi everyone,
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > This FLIP [1] introduces a new REST API for
> declaring
> >> > > > > > resource
> >> > > > > > > > > > > > requirements
> >> > > > > > > > > > > > > for the Adaptive Scheduler. There seems to be a
> clear need
> >> > > > > > for
> >> > > > > > > > this
> >> > > > > > > > > > API
> >> > > > > > > > > > > > > based on the discussion on the "Reworking the
> Rescale API"
> >> > > > > > [2]
> >> > > > > > > > > > thread.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Before we get started, this work is heavily
> based on the
> >> > > > > > > > prototype
> >> > > > > > > > > > [3]
> >> > > > > > > > > > > > > created by Till Rohrmann, and the FLIP is being
> published
> >> > > > > > with
> >> > > > > > > > his
> >> > > > > > > > > > > > consent.
> >> > > > > > > > > > > > > Big shoutout to him!
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Last and not least, thanks to Chesnay and Roman
> for the
> >> > > > > > initial
> >> > > > > > > > > > reviews
> >> > > > > > > > > > > > and
> >> > > > > > > > > > > > > discussions.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > The best start would be watching a short demo
> [4] that I've
> >> > > > > > > > > recorded,
> >> > > > > > > > > > > > which
> >> > > > > > > > > > > > > illustrates newly added capabilities (rescaling
> the running
> >> > > > > > > job,
> >> > > > > > > > > > > handing
> >> > > > > > > > > > > > > back resources to the RM, and session cluster
> support).
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > The intuition behind the FLIP is being able to
> define
> >> > > > > > resource
> >> > > > > > > > > > > > requirements
> >> > > > > > > > > > > > > ("resource boundaries") externally that the
> >> > > > > AdaptiveScheduler
> >> > > > > > > can
> >> > > > > > > > > > > > navigate
> >> > > > > > > > > > > > > within. This is a building block for
> higher-level efforts
> >> > > > > > such
> >> > > > > > > as
> >> > > > > > > > > an
> >> > > > > > > > > > > > > external Autoscaler. The natural extension of
> this work
> >> > > > > would
> >> > > > > > > be
> >> > > > > > > > to
> >> > > > > > > > > > > allow
> >> > > > > > > > > > > > > to specify per-vertex ResourceProfiles.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Looking forward to your thoughts; any feedback
> is
> >> > > > > > appreciated!
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > [1]
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> >> > > > > > > > > > > > > [2]
> >> > > > > > > > >
> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5
> >> > > > > > > > > > > > > [3]
> https://github.com/tillrohrmann/flink/tree/autoscaling
> >> > > > > > > > > > > > > [4]
> >> > > > > > > > > > > >
> >> > > > > > > > >
> >> > > > > >
> https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > D.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > --
> >> > > > > > > > > > > > Best
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > ConradJam
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
>

Reply via email to