> I suppose we could further remove the min because it would always be safer to scale down if resources are not available than not to run at all [1].
Apart from what @Roman has already mentioned, there are still cases where we're certain that there is no point in running the jobs with resources lower than X; e.g., because the state is too large to be processed with parallelism of 1; this allows you not to waste resources if you're certain that the job would go into the restart loop / won't be able to checkpoint I believe that for most use cases, simply keeping the lower bound at 1 will be sufficient. > I saw that the minimum bound is currently not used in the code you posted above [2]. Is that still planned? Yes. We already allow setting the lower bound via API, but it's not considered by the scheduler. I'll address this limitation in a separate issue. > Note that originally we had assumed min == max but I think that would be a less safe scaling approach because we would get stuck waiting for resources when they are not available, e.g. k8s resource limits reached. 100% agreed; The above-mentioned knobs should allow you to balance the trade-off. Does that make sense? Best, D. On Tue, Feb 28, 2023 at 1:14 PM Roman Khachatryan <ro...@apache.org> wrote: > Hi, > > Thanks for the update, I think distinguishing the rescaling behaviour and > the desired parallelism declaration is important. > > Having the ability to specify min parallelism might be useful in > environments with multiple jobs: Scheduler will then have an option to stop > the less suitable job. > In other setups, where the job should not be stopped at all, the user can > always set it to 0. > > Regards, > Roman > > > On Tue, Feb 28, 2023 at 12:58 PM Maximilian Michels <m...@apache.org> > wrote: > >> Hi David, >> >> Thanks for the update! We consider using the new declarative resource >> API for autoscaling. Currently, we treat a scaling decision as a new >> deployment which means surrendering all resources to Kubernetes and >> subsequently reallocating them for the rescaled deployment. The >> declarative resource management API is a great step forward because it >> allows us to do faster and safer rescaling. Faster, because we can >> continue to run while resources are pre-allocated which minimizes >> downtime. Safer, because we can't get stuck when the desired resources >> are not available. >> >> An example with two vertices and their respective parallelisms: >> v1: 50 >> v2: 10 >> Let's assume slot sharing is disabled, so we need 60 task slots to run >> the vertices. >> >> If the autoscaler was to decide to scale up v1 and v2, it could do so >> in a safe way by using min/max configuration: >> v1: [min: 50, max: 70] >> v2: [min: 10, max: 20] >> This would then need 90 task slots to run at max capacity. >> >> I suppose we could further remove the min because it would always be >> safer to scale down if resources are not available than to not run at >> all [1]. In fact, I saw that the minimum bound is currently not used >> in the code you posted above [2]. Is that still planned? >> >> -Max >> >> PS: Note that originally we had assumed min == max but I think that >> would be a less safe scaling approach because we would get stuck >> waiting for resources when they are not available, e.g. k8s resource >> limits reached. >> >> [1] However, there might be costs involved with executing the >> rescaling, e.g. for using external storage like s3, especially without >> local recovery. >> [2] >> https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9 >> >> On Tue, Feb 28, 2023 at 9:33 AM David Morávek <d...@apache.org> wrote: >> > >> > Hi Everyone, >> > >> > We had some more talks about the pre-allocation of resources with @Max, >> and >> > here is the final state that we've converged to for now: >> > >> > The vital thing to note about the new API is that it's declarative, >> meaning >> > we're declaring the desired state to which we want our job to converge; >> If, >> > after the requirements update job no longer holds the desired resources >> > (fewer resources than the lower bound), it will be canceled and >> transition >> > back into the waiting for resources state. >> > >> > In some use cases, you might always want to rescale to the upper bound >> > (this goes along the lines of "preallocating resources" and minimizing >> the >> > number of rescales, which is especially useful with the large state). >> This >> > can be controlled by two knobs that already exist: >> > >> > 1) "jobmanager.adaptive-scheduler.min-parallelism-increase" - this >> affects >> > a minimal parallelism increase step of a running job; we'll slightly >> change >> > the semantics, and we'll trigger rescaling either once this condition is >> > met or when you hit the ceiling; setting this to the high number will >> > ensure that you always rescale to the upper bound >> > >> > 2) "jobmanager.adaptive-scheduler.resource-stabilization-timeout" - for >> new >> > and already restarting jobs, we'll always respect this timeout, which >> > allows you to wait for more resources even though you already have more >> > resources than defined in the lower bound; again, in the case we reach >> the >> > ceiling (the upper bound), we'll transition into the executing state. >> > >> > >> > We're still planning to dig deeper in this direction with other efforts, >> > but this is already good enough and should allow us to move the FLIP >> > forward. >> > >> > WDYT? Unless there are any objectives against the above, I'd like to >> > proceed to a vote. >> > >> > Best, >> > D. >> > >> > On Thu, Feb 23, 2023 at 5:39 PM David Morávek <d...@apache.org> wrote: >> > >> > > Hi Everyone, >> > > >> > > @John >> > > >> > > This is a problem that we've spent some time trying to crack; in the >> end, >> > > we've decided to go against doing any upgrades to JobGraphStore from >> > > JobMaster to avoid having multiple writers that are guarded by >> different >> > > leader election lock (Dispatcher and JobMaster might live in a >> different >> > > process). The contract we've decided to choose instead is leveraging >> the >> > > idempotency of the endpoint and having the user of the API retry in >> case >> > > we're unable to persist new requirements in the JobGraphStore [1]. We >> > > eventually need to move JobGraphStore out of the dispatcher, but >> that's way >> > > out of the scope of this FLIP. The solution is a deliberate >> trade-off. The >> > > worst scenario is that the Dispatcher fails over in between retries, >> which >> > > would simply rescale the job to meet the previous resource >> requirements >> > > (more extended unavailability of underlying HA storage would have >> worse >> > > consequences than this). Does that answer your question? >> > > >> > > @Matthias >> > > >> > > Good catch! I'm fixing it now, thanks! >> > > >> > > [1] >> > > >> https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cR1151 >> > > >> > > Best, >> > > D. >> > > >> > > On Tue, Feb 21, 2023 at 12:24 AM John Roesler <vvcep...@apache.org> >> wrote: >> > > >> > >> Thanks for the FLIP, David! >> > >> >> > >> I just had one small question. IIUC, the REST API PUT request will go >> > >> through the new DispatcherGateway method to be handled. Then, after >> > >> validation, the dispatcher would call the new JobMasterGateway >> method to >> > >> actually update the job. >> > >> >> > >> Which component will write the updated JobGraph? I just wanted to >> make >> > >> sure it’s the JobMaster because it it were the dispatcher, there >> could be a >> > >> race condition with the async JobMaster method. >> > >> >> > >> Thanks! >> > >> -John >> > >> >> > >> On Mon, Feb 20, 2023, at 07:34, Matthias Pohl wrote: >> > >> > Thanks for your clarifications, David. I don't have any additional >> major >> > >> > points to add. One thing about the FLIP: The RPC layer API for >> updating >> > >> the >> > >> > JRR returns a future with a JRR? I don't see value in returning a >> JRR >> > >> here >> > >> > since it's an idempotent operation? Wouldn't it be enough to return >> > >> > CompletableFuture<Void> here? Or am I missing something? >> > >> > >> > >> > Matthias >> > >> > >> > >> > On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels <m...@apache.org >> > >> > >> wrote: >> > >> > >> > >> >> Thanks David! If we could get the pre-allocation working as part >> of >> > >> >> the FLIP, that would be great. >> > >> >> >> > >> >> Concerning the downscale case, I agree this is a special case for >> the >> > >> >> (single-job) application mode where we could re-allocate slots in >> a >> > >> >> way that could leave entire task managers unoccupied which we >> would >> > >> >> then be able to release. The goal essentially is to reduce slot >> > >> >> fragmentation on scale down by packing the slots efficiently. The >> > >> >> easiest way to add this optimization when running in application >> mode >> > >> >> would be to drop as many task managers during the restart such >> that >> > >> >> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look >> into >> > >> >> this independently of the FLIP. >> > >> >> >> > >> >> Feel free to start the vote. >> > >> >> >> > >> >> -Max >> > >> >> >> > >> >> On Mon, Feb 20, 2023 at 9:10 AM David Morávek <d...@apache.org> >> wrote: >> > >> >> > >> > >> >> > Hi everyone, >> > >> >> > >> > >> >> > Thanks for the feedback! I've updated the FLIP to use >> idempotent PUT >> > >> API >> > >> >> instead of PATCH and to properly handle lower bound settings, to >> > >> support >> > >> >> the "pre-allocation" of the resources. >> > >> >> > >> > >> >> > @Max >> > >> >> > >> > >> >> > > How hard would it be to address this issue in the FLIP? >> > >> >> > >> > >> >> > I've included this in the FLIP. It might not be too hard to >> implement >> > >> >> this in the end. >> > >> >> > >> > >> >> > > B) drop as many superfluous task managers as needed >> > >> >> > >> > >> >> > I've intentionally left this part out for now because this >> ultimately >> > >> >> needs to be the responsibility of the Resource Manager. After >> all, in >> > >> the >> > >> >> Session Cluster scenario, the Scheduler doesn't have the bigger >> > >> picture of >> > >> >> other tasks of other jobs running on those TMs. This will most >> likely >> > >> be a >> > >> >> topic for another FLIP. >> > >> >> > >> > >> >> > WDYT? If there are no other questions or concerns, I'd like to >> start >> > >> the >> > >> >> vote on Wednesday. >> > >> >> > >> > >> >> > Best, >> > >> >> > D. >> > >> >> > >> > >> >> > On Wed, Feb 15, 2023 at 3:34 PM Maximilian Michels < >> m...@apache.org> >> > >> >> wrote: >> > >> >> >> >> > >> >> >> I missed that the FLIP states: >> > >> >> >> >> > >> >> >> > Currently, even though we’d expose the lower bound for >> clarity and >> > >> >> API completeness, we won’t allow setting it to any other value >> than one >> > >> >> until we have full support throughout the stack. >> > >> >> >> >> > >> >> >> How hard would it be to address this issue in the FLIP? >> > >> >> >> >> > >> >> >> There is not much value to offer setting a lower bound which >> won't >> > >> be >> > >> >> >> respected / throw an error when it is set. If we had support >> for a >> > >> >> >> lower bound, we could enforce a resource contract externally >> via >> > >> >> >> setting lowerBound == upperBound. That ties back to the >> Rescale API >> > >> >> >> discussion we had. I want to better understand what the major >> > >> concerns >> > >> >> >> would be around allowing this. >> > >> >> >> >> > >> >> >> Just to outline how I imagine the logic to work: >> > >> >> >> >> > >> >> >> A) The resource constraints are already met => Nothing changes >> > >> >> >> B) More resources available than required => Cancel the job, >> drop as >> > >> >> >> many superfluous task managers as needed, restart the job >> > >> >> >> C) Less resources available than required => Acquire new task >> > >> >> >> managers, wait for them to register, cancel and restart the job >> > >> >> >> >> > >> >> >> I'm open to helping out with the implementation. >> > >> >> >> >> > >> >> >> -Max >> > >> >> >> >> > >> >> >> On Mon, Feb 13, 2023 at 7:45 PM Maximilian Michels < >> m...@apache.org> >> > >> >> wrote: >> > >> >> >> > >> > >> >> >> > Based on further discussion I had with Chesnay on this PR >> [1], I >> > >> think >> > >> >> >> > jobs would currently go into a restarting state after the >> resource >> > >> >> >> > requirements have changed. This wouldn't achieve what we had >> in >> > >> mind, >> > >> >> >> > i.e. sticking to the old resource requirements until enough >> slots >> > >> are >> > >> >> >> > available to fulfil the new resource requirements. So this >> may >> > >> not be >> > >> >> >> > 100% what we need but it could be extended to do what we >> want. >> > >> >> >> > >> > >> >> >> > -Max >> > >> >> >> > >> > >> >> >> > [1] >> > >> https://github.com/apache/flink/pull/21908#discussion_r1104792362 >> > >> >> >> > >> > >> >> >> > On Mon, Feb 13, 2023 at 7:16 PM Maximilian Michels < >> > >> m...@apache.org> >> > >> >> wrote: >> > >> >> >> > > >> > >> >> >> > > Hi David, >> > >> >> >> > > >> > >> >> >> > > This is awesome! Great writeup and demo. This is pretty >> much >> > >> what we >> > >> >> >> > > need for the autoscaler as part of the Flink Kubernetes >> operator >> > >> >> [1]. >> > >> >> >> > > Scaling Flink jobs effectively is hard but fortunately we >> have >> > >> >> solved >> > >> >> >> > > the issue as part of the Flink Kubernetes operator. The >> only >> > >> >> critical >> > >> >> >> > > piece we are missing is a better way to execute scaling >> > >> decisions, >> > >> >> as >> > >> >> >> > > discussed in [2]. >> > >> >> >> > > >> > >> >> >> > > Looking at your proposal, we would set lowerBound == >> upperBound >> > >> for >> > >> >> >> > > the parallelism because we want to fully determine the >> > >> parallelism >> > >> >> >> > > externally based on the scaling metrics. Does that sound >> right? >> > >> >> >> > > >> > >> >> >> > > What is the timeline for these changes? Is there a JIRA? >> > >> >> >> > > >> > >> >> >> > > Cheers, >> > >> >> >> > > Max >> > >> >> >> > > >> > >> >> >> > > [1] >> > >> >> >> > >> >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/ >> > >> >> >> > > [2] >> > >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 >> > >> >> >> > > >> > >> >> >> > > On Mon, Feb 13, 2023 at 1:16 PM feng xiangyu < >> > >> xiangyu...@gmail.com> >> > >> >> wrote: >> > >> >> >> > > > >> > >> >> >> > > > Hi David, >> > >> >> >> > > > >> > >> >> >> > > > Thanks for your reply. I think your response totally >> make >> > >> >> sense. This >> > >> >> >> > > > flip targets on declaring required resource to >> ResourceManager >> > >> >> instead of >> > >> >> >> > > > using ResourceManager to add/remove TMs directly. >> > >> >> >> > > > >> > >> >> >> > > > Best, >> > >> >> >> > > > Xiangyu >> > >> >> >> > > > >> > >> >> >> > > > >> > >> >> >> > > > >> > >> >> >> > > > David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 >> > >> 15:46写道: >> > >> >> >> > > > >> > >> >> >> > > > > Hi everyone, >> > >> >> >> > > > > >> > >> >> >> > > > > @Shammon >> > >> >> >> > > > > >> > >> >> >> > > > > I'm not entirely sure what "config file" you're >> referring >> > >> to. >> > >> >> You can, of >> > >> >> >> > > > > course, override the default parallelism in >> > >> "flink-conf.yaml", >> > >> >> but for >> > >> >> >> > > > > sinks and sources, the parallelism needs to be tweaked >> on >> > >> the >> > >> >> connector >> > >> >> >> > > > > level ("WITH" statement). >> > >> >> >> > > > > >> > >> >> >> > > > > This is something that should be achieved with tooling >> > >> around >> > >> >> Flink. We >> > >> >> >> > > > > want to provide an API on the lowest level that >> generalizes >> > >> >> well. Achieving >> > >> >> >> > > > > what you're describing should be straightforward with >> this >> > >> API. >> > >> >> >> > > > > >> > >> >> >> > > > > @Xiangyu >> > >> >> >> > > > > >> > >> >> >> > > > > Is it possible for this REST API to declare TM >> resources in >> > >> the >> > >> >> future? >> > >> >> >> > > > > >> > >> >> >> > > > > >> > >> >> >> > > > > Would you like to add/remove TMs if you use an active >> > >> Resource >> > >> >> Manager? >> > >> >> >> > > > > This would be out of the scope of this effort since it >> > >> targets >> > >> >> the >> > >> >> >> > > > > scheduler component only (we make no assumptions about >> the >> > >> used >> > >> >> Resource >> > >> >> >> > > > > Manager). Also, the AdaptiveScheduler is only intended >> to be >> > >> >> used for >> > >> >> >> > > > > Streaming. >> > >> >> >> > > > > >> > >> >> >> > > > > And for streaming jobs, I'm wondering if there is any >> > >> >> situation we need to >> > >> >> >> > > > > > rescale the TM resources of a flink cluster at first >> and >> > >> then >> > >> >> the >> > >> >> >> > > > > adaptive >> > >> >> >> > > > > > scheduler will rescale the per-vertex >> ResourceProfiles >> > >> >> accordingly. >> > >> >> >> > > > > > >> > >> >> >> > > > > >> > >> >> >> > > > > We plan on adding support for the ResourceProfiles >> (dynamic >> > >> slot >> > >> >> >> > > > > allocation) as the next step. Again we won't make any >> > >> >> assumptions about the >> > >> >> >> > > > > used Resource Manager. In other words, this effort >> ends by >> > >> >> declaring >> > >> >> >> > > > > desired resources to the Resource Manager. >> > >> >> >> > > > > >> > >> >> >> > > > > Does that make sense? >> > >> >> >> > > > > >> > >> >> >> > > > > @Matthias >> > >> >> >> > > > > >> > >> >> >> > > > > We've done another pass on the proposed API and >> currently >> > >> lean >> > >> >> towards >> > >> >> >> > > > > having an idempotent PUT API. >> > >> >> >> > > > > - We don't care too much about multiple writers' >> scenarios >> > >> in >> > >> >> terms of who >> > >> >> >> > > > > can write an authoritative payload; this is up to the >> user >> > >> of >> > >> >> the API to >> > >> >> >> > > > > figure out >> > >> >> >> > > > > - It's indeed tricky to achieve atomicity with PATCH >> API; >> > >> >> switching to PUT >> > >> >> >> > > > > API seems to do the trick >> > >> >> >> > > > > - We won't allow partial "payloads" anymore, meaning >> you >> > >> need >> > >> >> to define >> > >> >> >> > > > > requirements for all vertices in the JobGraph; This is >> > >> >> completely fine for >> > >> >> >> > > > > the programmatic workflows. For DEBUG / DEMO purposes, >> you >> > >> can >> > >> >> use the GET >> > >> >> >> > > > > endpoint and tweak the response to avoid writing the >> whole >> > >> >> payload by hand. >> > >> >> >> > > > > >> > >> >> >> > > > > WDYT? >> > >> >> >> > > > > >> > >> >> >> > > > > >> > >> >> >> > > > > Best, >> > >> >> >> > > > > D. >> > >> >> >> > > > > >> > >> >> >> > > > > On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu < >> > >> >> xiangyu...@gmail.com> >> > >> >> >> > > > > wrote: >> > >> >> >> > > > > >> > >> >> >> > > > > > Hi David, >> > >> >> >> > > > > > >> > >> >> >> > > > > > Thanks for creating this flip. I think this work it >> is >> > >> very >> > >> >> useful, >> > >> >> >> > > > > > especially in autoscaling scenario. I would like to >> share >> > >> >> some questions >> > >> >> >> > > > > > from my view. >> > >> >> >> > > > > > >> > >> >> >> > > > > > 1, Is it possible for this REST API to declare TM >> > >> resources >> > >> >> in the >> > >> >> >> > > > > future? >> > >> >> >> > > > > > I'm asking because we are building the autoscaling >> feature >> > >> >> for Flink OLAP >> > >> >> >> > > > > > Session Cluster in ByteDance. We need to rescale the >> > >> >> cluster's resource >> > >> >> >> > > > > on >> > >> >> >> > > > > > TM level instead of Job level. It would be very >> helpful >> > >> if we >> > >> >> have a REST >> > >> >> >> > > > > > API for out external Autoscaling service to use. >> > >> >> >> > > > > > >> > >> >> >> > > > > > 2, And for streaming jobs, I'm wondering if there is >> any >> > >> >> situation we >> > >> >> >> > > > > need >> > >> >> >> > > > > > to rescale the TM resources of a flink cluster at >> first >> > >> and >> > >> >> then the >> > >> >> >> > > > > > adaptive scheduler will rescale the per-vertex >> > >> >> ResourceProfiles >> > >> >> >> > > > > > accordingly. >> > >> >> >> > > > > > >> > >> >> >> > > > > > best. >> > >> >> >> > > > > > Xiangyu >> > >> >> >> > > > > > >> > >> >> >> > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道: >> > >> >> >> > > > > > >> > >> >> >> > > > > > > Hi David >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > Thanks for your answer. >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to >> use >> > >> the >> > >> >> endpoint? I >> > >> >> >> > > > > > > think we can ultimately introduce a way of >> re-declaring >> > >> >> "per-vertex >> > >> >> >> > > > > > > defaults," but I'd like to understand the use case >> bit >> > >> more >> > >> >> first. >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > For this issue, I mainly consider the consistency >> of >> > >> user >> > >> >> configuration >> > >> >> >> > > > > > and >> > >> >> >> > > > > > > job runtime. For sql jobs, users usually set >> specific >> > >> >> parallelism for >> > >> >> >> > > > > > > source and sink, and set a global parallelism for >> other >> > >> >> operators. >> > >> >> >> > > > > These >> > >> >> >> > > > > > > config items are stored in a config file. For some >> > >> >> high-priority jobs, >> > >> >> >> > > > > > > users may want to manage them manually. >> > >> >> >> > > > > > > 1. When users need to scale the parallelism, they >> should >> > >> >> update the >> > >> >> >> > > > > > config >> > >> >> >> > > > > > > file and restart flink job, which may take a long >> time. >> > >> >> >> > > > > > > 2. After providing the REST API, users can just >> send a >> > >> >> request to the >> > >> >> >> > > > > job >> > >> >> >> > > > > > > via REST API quickly after updating the config >> file. >> > >> >> >> > > > > > > The configuration in the running job and config >> file >> > >> should >> > >> >> be the >> > >> >> >> > > > > same. >> > >> >> >> > > > > > > What do you think of this? >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > best. >> > >> >> >> > > > > > > Shammon >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek < >> > >> >> david.mora...@gmail.com> >> > >> >> >> > > > > > > wrote: >> > >> >> >> > > > > > > >> > >> >> >> > > > > > > > Hi everyone, >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Let's try to answer the questions one by one. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > *@ConradJam* >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > when the number of "slots" is insufficient, can >> we can >> > >> >> stop users >> > >> >> >> > > > > > > rescaling >> > >> >> >> > > > > > > > > or throw something to tell user "less avaliable >> > >> slots >> > >> >> to upgrade, >> > >> >> >> > > > > > > please >> > >> >> >> > > > > > > > > checkout your alivalbe slots" ? >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > The main property of AdaptiveScheduler is that >> it can >> > >> >> adapt to >> > >> >> >> > > > > > "available >> > >> >> >> > > > > > > > resources," which means you're still able to make >> > >> >> progress even >> > >> >> >> > > > > though >> > >> >> >> > > > > > > you >> > >> >> >> > > > > > > > didn't get all the slots you've asked for. Let's >> break >> > >> >> down the pros >> > >> >> >> > > > > > and >> > >> >> >> > > > > > > > cons of this property. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > - (plus) If you lose a TM for some reason, you >> can >> > >> still >> > >> >> recover even >> > >> >> >> > > > > > if >> > >> >> >> > > > > > > it >> > >> >> >> > > > > > > > doesn't come back. We still need to give it some >> time >> > >> to >> > >> >> eliminate >> > >> >> >> > > > > > > > unnecessary rescaling, which can be controlled by >> > >> setting >> > >> >> >> > > > > > > > "resource-stabilization-timeout." >> > >> >> >> > > > > > > > - (plus) The resources can arrive with a >> significant >> > >> >> delay. For >> > >> >> >> > > > > > example, >> > >> >> >> > > > > > > > you're unable to spawn enough TMs on time because >> > >> you've >> > >> >> run out of >> > >> >> >> > > > > > > > resources in your k8s cluster, and you need to >> wait >> > >> for >> > >> >> the cluster >> > >> >> >> > > > > > auto >> > >> >> >> > > > > > > > scaler to kick in and add new nodes to the >> cluster. In >> > >> >> this scenario, >> > >> >> >> > > > > > > > you'll be able to start making progress faster, >> at the >> > >> >> cost of >> > >> >> >> > > > > multiple >> > >> >> >> > > > > > > > rescalings (once the remaining resources arrive). >> > >> >> >> > > > > > > > - (plus) This plays well with the declarative >> manner >> > >> of >> > >> >> today's >> > >> >> >> > > > > > > > infrastructure. For example, you tell k8s that >> you >> > >> need >> > >> >> 10 TMs, and >> > >> >> >> > > > > > > you'll >> > >> >> >> > > > > > > > eventually get them. >> > >> >> >> > > > > > > > - (minus) In the case of large state jobs, the >> cost of >> > >> >> multiple >> > >> >> >> > > > > > > rescalings >> > >> >> >> > > > > > > > might outweigh the above. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > We've already touched on the solution to this >> problem >> > >> on >> > >> >> the FLIP. >> > >> >> >> > > > > > Please >> > >> >> >> > > > > > > > notice the parallelism knob being a range with a >> lower >> > >> >> and upper >> > >> >> >> > > > > bound. >> > >> >> >> > > > > > > > Setting both the lower and upper bound to the >> same >> > >> value >> > >> >> could give >> > >> >> >> > > > > the >> > >> >> >> > > > > > > > behavior you're describing at the cost of giving >> up >> > >> some >> > >> >> properties >> > >> >> >> > > > > > that >> > >> >> >> > > > > > > AS >> > >> >> >> > > > > > > > gives you (you'd be falling back to the >> > >> >> DefaultScheduler's behavior). >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > when user upgrade job-vertx-parallelism . I want >> to >> > >> have >> > >> >> an interface >> > >> >> >> > > > > > to >> > >> >> >> > > > > > > > > query the current update parallel execution >> status, >> > >> so >> > >> >> that the >> > >> >> >> > > > > user >> > >> >> >> > > > > > or >> > >> >> >> > > > > > > > > program can understand the current status >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > This is a misunderstanding. We're not >> introducing the >> > >> >> RESCALE >> > >> >> >> > > > > endpoint. >> > >> >> >> > > > > > > > This endpoint allows you to re-declare the >> resources >> > >> >> needed to run >> > >> >> >> > > > > the >> > >> >> >> > > > > > > job. >> > >> >> >> > > > > > > > Once you reach the desired resources (you get >> more >> > >> >> resources than the >> > >> >> >> > > > > > > lower >> > >> >> >> > > > > > > > bound defines), your job will run. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > We can expose a similar endpoint to "resource >> > >> >> requirements" to give >> > >> >> >> > > > > you >> > >> >> >> > > > > > > an >> > >> >> >> > > > > > > > overview of the resources the vertices already >> have. >> > >> You >> > >> >> can already >> > >> >> >> > > > > > get >> > >> >> >> > > > > > > > this from the REST API, so exposing this in yet >> > >> another >> > >> >> way should be >> > >> >> >> > > > > > > > considered carefully. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > *@Matthias* >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > I'm wondering whether it makes sense to add some >> kind >> > >> of >> > >> >> resource ID >> > >> >> >> > > > > to >> > >> >> >> > > > > > > the >> > >> >> >> > > > > > > > > REST API. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > That's a good question. I want to think about >> that and >> > >> >> get back to >> > >> >> >> > > > > the >> > >> >> >> > > > > > > > question later. My main struggle when thinking >> about >> > >> this >> > >> >> is, "if >> > >> >> >> > > > > this >> > >> >> >> > > > > > > > would be an idempotent POST endpoint," would it >> be any >> > >> >> different? >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > How often do we allow resource requirements to be >> > >> changed? >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > There shall be no rate limiting on the FLINK >> side. If >> > >> >> this is >> > >> >> >> > > > > something >> > >> >> >> > > > > > > > your environment needs, you can achieve it on a >> > >> different >> > >> >> layer ("we >> > >> >> >> > > > > > > can't >> > >> >> >> > > > > > > > have FLINK to do everything"). >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Versioning the JobGraph in the JobGraphStore >> rather >> > >> than >> > >> >> overwriting >> > >> >> >> > > > > it >> > >> >> >> > > > > > > > > might be an idea. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > This sounds interesting since it would be closer >> to >> > >> the >> > >> >> JobGraph >> > >> >> >> > > > > being >> > >> >> >> > > > > > > > immutable. The main problem I see here is that >> this >> > >> would >> > >> >> introduce a >> > >> >> >> > > > > > > > BW-incompatible change so it might be a topic for >> > >> >> follow-up FLIP. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > I'm just wondering whether we bundle two things >> > >> together >> > >> >> that are >> > >> >> >> > > > > > > actually >> > >> >> >> > > > > > > > > separate >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Yup, this is how we think about it as well. The >> main >> > >> >> question is, >> > >> >> >> > > > > "who >> > >> >> >> > > > > > > > should be responsible for bookkeeping 1) the >> JobGraph >> > >> and >> > >> >> 2) the >> > >> >> >> > > > > > > > JobResourceRequirements". The JobMaster would be >> the >> > >> >> right place for >> > >> >> >> > > > > > > both, >> > >> >> >> > > > > > > > but it's currently not the case, and we're >> tightly >> > >> >> coupling the >> > >> >> >> > > > > > > dispatcher >> > >> >> >> > > > > > > > with the JobMaster. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Initially, we tried to introduce a separate HA >> > >> component >> > >> >> in JobMaster >> > >> >> >> > > > > > for >> > >> >> >> > > > > > > > bookkeeping the JobResourceRequirements, but that >> > >> proved >> > >> >> to be a more >> > >> >> >> > > > > > > > significant effort adding additional mess to the >> > >> already >> > >> >> messy HA >> > >> >> >> > > > > > > > ecosystem. Another approach we've discussed was >> > >> mutating >> > >> >> the JobGraph >> > >> >> >> > > > > > and >> > >> >> >> > > > > > > > setting JRR into the JobGraph structure itself. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > The middle ground for keeping this effort >> reasonably >> > >> >> sized and not >> > >> >> >> > > > > > > > violating "we want to keep JG immutable" too >> much is >> > >> >> keeping the >> > >> >> >> > > > > > > > JobResourceRequirements separate as an internal >> config >> > >> >> option in >> > >> >> >> > > > > > > JobGraph's >> > >> >> >> > > > > > > > configuration. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > We ultimately need to rethink the tight coupling >> of >> > >> >> Dispatcher and >> > >> >> >> > > > > > > > JobMaster, but it needs to be a separate effort. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > ...also considering the amount of data that can >> be >> > >> stored >> > >> >> in a >> > >> >> >> > > > > > > > > ConfigMap/ZooKeeper node if versioning the >> resource >> > >> >> requirement >> > >> >> >> > > > > > change >> > >> >> >> > > > > > > as >> > >> >> >> > > > > > > > > proposed in my previous item is an option for >> us. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > AFAIK we're only storing pointers to the S3 >> objects >> > >> in HA >> > >> >> metadata, >> > >> >> >> > > > > so >> > >> >> >> > > > > > we >> > >> >> >> > > > > > > > should be okay with having larger structures for >> now. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Updating the JobGraphStore means adding more >> requests >> > >> to >> > >> >> the HA >> > >> >> >> > > > > backend >> > >> >> >> > > > > > > > API. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > It's fine unless you intend to override the >> resource >> > >> >> requirements a >> > >> >> >> > > > > few >> > >> >> >> > > > > > > > times per second. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > *@Shammon* >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > How about adding some more information such as >> vertex >> > >> type >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Since it was intended as a "debug" endpoint, it >> makes >> > >> >> complete sense! >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > For sql jobs, we always use a unified >> parallelism for >> > >> >> most vertices. >> > >> >> >> > > > > > Can >> > >> >> >> > > > > > > > > we provide them with a more convenient setting >> > >> method >> > >> >> instead of >> > >> >> >> > > > > each >> > >> >> >> > > > > > > > one? >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > I completely feel with this. The main thoughts >> when >> > >> >> designing the API >> > >> >> >> > > > > > > were: >> > >> >> >> > > > > > > > - We want to keep it clean and easy to >> understand. >> > >> >> >> > > > > > > > - Global parallelism can be modeled using >> per-vertex >> > >> >> parallelism but >> > >> >> >> > > > > > not >> > >> >> >> > > > > > > > the other way around. >> > >> >> >> > > > > > > > - The API will be used by external tooling >> (operator, >> > >> >> auto scaler). >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to >> use >> > >> the >> > >> >> endpoint? I >> > >> >> >> > > > > > > think >> > >> >> >> > > > > > > > we can ultimately introduce a way of re-declaring >> > >> >> "per-vertex >> > >> >> >> > > > > > defaults," >> > >> >> >> > > > > > > > but I'd like to understand the use case bit more >> > >> first. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > *@Weijie* >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > What is the default value here (based on what >> > >> >> configuration), or just >> > >> >> >> > > > > > > > > infinite? >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Currently, for the lower bound, it's always one, >> and >> > >> for >> > >> >> the upper >> > >> >> >> > > > > > bound, >> > >> >> >> > > > > > > > it's either parallelism (if defined) or the >> > >> >> maxParallelism of the >> > >> >> >> > > > > > vertex >> > >> >> >> > > > > > > in >> > >> >> >> > > > > > > > JobGraph. This question might be another signal >> for >> > >> >> making the >> > >> >> >> > > > > defaults >> > >> >> >> > > > > > > > explicit (see the answer to Shammon's question >> above). >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Thanks, everyone, for your initial thoughts! >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > Best, >> > >> >> >> > > > > > > > D. >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo < >> > >> >> guoweijieres...@gmail.com >> > >> >> >> > > > > > >> > >> >> >> > > > > > > > wrote: >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > > > Thanks David for driving this. This is a very >> > >> valuable >> > >> >> work, >> > >> >> >> > > > > > especially >> > >> >> >> > > > > > > > for >> > >> >> >> > > > > > > > > cloud native environment. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> How about adding some more information such >> as >> > >> >> vertex type >> > >> >> >> > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of >> `get >> > >> jobs >> > >> >> >> > > > > > > > > resource-requirements`? For users, only >> vertex-id >> > >> may >> > >> >> be difficult >> > >> >> >> > > > > to >> > >> >> >> > > > > > > > > understand. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > +1 for this suggestion, including jobvertex's >> name >> > >> in >> > >> >> the response >> > >> >> >> > > > > > body >> > >> >> >> > > > > > > > is >> > >> >> >> > > > > > > > > more >> > >> >> >> > > > > > > > > user-friendly. >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > I saw this sentence in FLIP: "Setting the upper >> > >> bound >> > >> >> to -1 will >> > >> >> >> > > > > > reset >> > >> >> >> > > > > > > > the >> > >> >> >> > > > > > > > > value to the default setting." What is the >> default >> > >> >> value here >> > >> >> >> > > > > (based >> > >> >> >> > > > > > > on >> > >> >> >> > > > > > > > > what configuration), or just infinite? >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > Best regards, >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > Weijie >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 >> > >> 18:06写道: >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > > > Hi David >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > Thanks for initiating this discussion. I >> think >> > >> >> declaring job >> > >> >> >> > > > > > resource >> > >> >> >> > > > > > > > > > requirements by REST API is very valuable. I >> just >> > >> >> left some >> > >> >> >> > > > > > comments >> > >> >> >> > > > > > > as >> > >> >> >> > > > > > > > > > followed >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > 1) How about adding some more information >> such as >> > >> >> vertex type >> > >> >> >> > > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response >> of `get >> > >> >> jobs >> > >> >> >> > > > > > > > > > resource-requirements`? For users, only >> vertex-id >> > >> may >> > >> >> be >> > >> >> >> > > > > difficult >> > >> >> >> > > > > > to >> > >> >> >> > > > > > > > > > understand. >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > 2) For sql jobs, we always use a unified >> > >> parallelism >> > >> >> for most >> > >> >> >> > > > > > > vertices. >> > >> >> >> > > > > > > > > Can >> > >> >> >> > > > > > > > > > we provide them with a more convenient >> setting >> > >> method >> > >> >> instead of >> > >> >> >> > > > > > each >> > >> >> >> > > > > > > > > one? >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > Best, >> > >> >> >> > > > > > > > > > Shammon >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl >> < >> > >> >> >> > > > > > matthias.p...@aiven.io >> > >> >> >> > > > > > > > > > .invalid> >> > >> >> >> > > > > > > > > > wrote: >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > > > Thanks David for creating this FLIP. It >> sounds >> > >> >> promising and >> > >> >> >> > > > > > useful >> > >> >> >> > > > > > > > to >> > >> >> >> > > > > > > > > > > have. Here are some thoughts from my side >> (some >> > >> of >> > >> >> them might >> > >> >> >> > > > > be >> > >> >> >> > > > > > > > > rather a >> > >> >> >> > > > > > > > > > > follow-up and not necessarily part of this >> > >> FLIP): >> > >> >> >> > > > > > > > > > > - I'm wondering whether it makes sense to >> add >> > >> some >> > >> >> kind of >> > >> >> >> > > > > > resource >> > >> >> >> > > > > > > > ID >> > >> >> >> > > > > > > > > to >> > >> >> >> > > > > > > > > > > the REST API. This would give Flink a tool >> to >> > >> >> verify the PATCH >> > >> >> >> > > > > > > > request >> > >> >> >> > > > > > > > > of >> > >> >> >> > > > > > > > > > > the external system in a compare-and-set >> kind of >> > >> >> manner. AFAIU, >> > >> >> >> > > > > > the >> > >> >> >> > > > > > > > > > process >> > >> >> >> > > > > > > > > > > requires the external system to retrieve >> the >> > >> >> resource >> > >> >> >> > > > > > requirements >> > >> >> >> > > > > > > > > first >> > >> >> >> > > > > > > > > > > (to retrieve the vertex IDs). A resource ID >> > >> <ABC> >> > >> >> would be sent >> > >> >> >> > > > > > > along >> > >> >> >> > > > > > > > > as >> > >> >> >> > > > > > > > > > a >> > >> >> >> > > > > > > > > > > unique identifier for the provided setup. >> It's >> > >> >> essentially the >> > >> >> >> > > > > > > > version >> > >> >> >> > > > > > > > > ID >> > >> >> >> > > > > > > > > > > of the currently deployed resource >> requirement >> > >> >> configuration. >> > >> >> >> > > > > > Flink >> > >> >> >> > > > > > > > > > doesn't >> > >> >> >> > > > > > > > > > > know whether the external system would use >> the >> > >> >> provided >> > >> >> >> > > > > > information >> > >> >> >> > > > > > > > in >> > >> >> >> > > > > > > > > > some >> > >> >> >> > > > > > > > > > > way to derive a new set of resource >> requirements >> > >> >> for this job. >> > >> >> >> > > > > > The >> > >> >> >> > > > > > > > > > > subsequent PATCH request with updated >> resource >> > >> >> requirements >> > >> >> >> > > > > would >> > >> >> >> > > > > > > > > include >> > >> >> >> > > > > > > > > > > the previously retrieved resource ID >> <ABC>. The >> > >> >> PATCH call >> > >> >> >> > > > > would >> > >> >> >> > > > > > > fail >> > >> >> >> > > > > > > > > if >> > >> >> >> > > > > > > > > > > there was a concurrent PATCH call in >> between >> > >> >> indicating to the >> > >> >> >> > > > > > > > external >> > >> >> >> > > > > > > > > > > system that the resource requirements were >> > >> >> concurrently >> > >> >> >> > > > > updated. >> > >> >> >> > > > > > > > > > > - How often do we allow resource >> requirements >> > >> to be >> > >> >> changed? >> > >> >> >> > > > > That >> > >> >> >> > > > > > > > > > question >> > >> >> >> > > > > > > > > > > might make my previous comment on the >> resource >> > >> ID >> > >> >> obsolete >> > >> >> >> > > > > > because >> > >> >> >> > > > > > > we >> > >> >> >> > > > > > > > > > could >> > >> >> >> > > > > > > > > > > just make any PATCH call fail if there was >> a >> > >> >> resource >> > >> >> >> > > > > requirement >> > >> >> >> > > > > > > > > update >> > >> >> >> > > > > > > > > > > within a certain time frame before the >> request. >> > >> But >> > >> >> such a time >> > >> >> >> > > > > > > > period >> > >> >> >> > > > > > > > > is >> > >> >> >> > > > > > > > > > > something we might want to make >> configurable >> > >> then, >> > >> >> I guess. >> > >> >> >> > > > > > > > > > > - Versioning the JobGraph in the >> JobGraphStore >> > >> >> rather than >> > >> >> >> > > > > > > > overwriting >> > >> >> >> > > > > > > > > it >> > >> >> >> > > > > > > > > > > might be an idea. This would enable us to >> > >> provide >> > >> >> resource >> > >> >> >> > > > > > > > requirement >> > >> >> >> > > > > > > > > > > changes in the UI or through the REST API. >> It is >> > >> >> related to a >> > >> >> >> > > > > > > problem >> > >> >> >> > > > > > > > > > > around keeping track of the exception >> history >> > >> >> within the >> > >> >> >> > > > > > > > > > AdaptiveScheduler >> > >> >> >> > > > > > > > > > > and also having to consider multiple >> versions >> > >> of a >> > >> >> JobGraph. >> > >> >> >> > > > > But >> > >> >> >> > > > > > > for >> > >> >> >> > > > > > > > > that >> > >> >> >> > > > > > > > > > > one, we use the ExecutionGraphInfoStore >> right >> > >> now. >> > >> >> >> > > > > > > > > > > - Updating the JobGraph in the >> JobGraphStore >> > >> makes >> > >> >> sense. I'm >> > >> >> >> > > > > > just >> > >> >> >> > > > > > > > > > > wondering whether we bundle two things >> together >> > >> >> that are >> > >> >> >> > > > > actually >> > >> >> >> > > > > > > > > > separate: >> > >> >> >> > > > > > > > > > > The business logic and the execution >> > >> configuration >> > >> >> (the >> > >> >> >> > > > > resource >> > >> >> >> > > > > > > > > > > requirements). I'm aware that this is not a >> > >> flaw of >> > >> >> the current >> > >> >> >> > > > > > > FLIP >> > >> >> >> > > > > > > > > but >> > >> >> >> > > > > > > > > > > rather something that was not necessary to >> > >> address >> > >> >> in the past >> > >> >> >> > > > > > > > because >> > >> >> >> > > > > > > > > > the >> > >> >> >> > > > > > > > > > > JobGraph was kind of static. I don't >> remember >> > >> >> whether that was >> > >> >> >> > > > > > > > already >> > >> >> >> > > > > > > > > > > discussed while working on the >> AdaptiveScheduler >> > >> >> for FLIP-160 >> > >> >> >> > > > > > [1]. >> > >> >> >> > > > > > > > > Maybe, >> > >> >> >> > > > > > > > > > > I'm missing some functionality here that >> > >> requires >> > >> >> us to have >> > >> >> >> > > > > > > > everything >> > >> >> >> > > > > > > > > > in >> > >> >> >> > > > > > > > > > > one place. But it feels like updating the >> entire >> > >> >> JobGraph which >> > >> >> >> > > > > > > could >> > >> >> >> > > > > > > > > be >> > >> >> >> > > > > > > > > > > actually a "config change" is not >> reasonable. >> > >> >> ...also >> > >> >> >> > > > > considering >> > >> >> >> > > > > > > the >> > >> >> >> > > > > > > > > > > amount of data that can be stored in a >> > >> >> ConfigMap/ZooKeeper node >> > >> >> >> > > > > > if >> > >> >> >> > > > > > > > > > > versioning the resource requirement change >> as >> > >> >> proposed in my >> > >> >> >> > > > > > > previous >> > >> >> >> > > > > > > > > > item >> > >> >> >> > > > > > > > > > > is an option for us. >> > >> >> >> > > > > > > > > > > - Updating the JobGraphStore means adding >> more >> > >> >> requests to the >> > >> >> >> > > > > HA >> > >> >> >> > > > > > > > > backend >> > >> >> >> > > > > > > > > > > API. There were some concerns shared in the >> > >> >> discussion thread >> > >> >> >> > > > > [2] >> > >> >> >> > > > > > > for >> > >> >> >> > > > > > > > > > > FLIP-270 [3] on pressuring the k8s API >> server in >> > >> >> the past with >> > >> >> >> > > > > > too >> > >> >> >> > > > > > > > many >> > >> >> >> > > > > > > > > > > calls. Eventhough, it's more likely to be >> > >> caused by >> > >> >> >> > > > > > checkpointing, >> > >> >> >> > > > > > > I >> > >> >> >> > > > > > > > > > still >> > >> >> >> > > > > > > > > > > wanted to bring it up. We're working on a >> > >> >> standardized >> > >> >> >> > > > > > performance >> > >> >> >> > > > > > > > test >> > >> >> >> > > > > > > > > > to >> > >> >> >> > > > > > > > > > > prepare going forward with FLIP-270 [3] >> right >> > >> now. >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > Best, >> > >> >> >> > > > > > > > > > > Matthias >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > [1] >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > >> > >> >> >> > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler >> > >> >> >> > > > > > > > > > > [2] >> > >> >> >> > > > > > > >> > >> >> https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj >> > >> >> >> > > > > > > > > > > [3] >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > >> > >> >> >> > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam < >> > >> >> jam.gz...@gmail.com >> > >> >> >> > > > > > >> > >> >> >> > > > > > > > wrote: >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > Hi David: >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > Thank you for drive this flip, which >> helps >> > >> less >> > >> >> flink >> > >> >> >> > > > > shutdown >> > >> >> >> > > > > > > time >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > for this flip, I would like to make a few >> > >> idea on >> > >> >> share >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - when the number of "slots" is >> > >> insufficient, >> > >> >> can we can >> > >> >> >> > > > > > stop >> > >> >> >> > > > > > > > > users >> > >> >> >> > > > > > > > > > > > rescaling or throw something to tell >> user >> > >> >> "less avaliable >> > >> >> >> > > > > > > slots >> > >> >> >> > > > > > > > to >> > >> >> >> > > > > > > > > > > > upgrade, >> > >> >> >> > > > > > > > > > > > please checkout your alivalbe slots" >> ? Or >> > >> we >> > >> >> could have a >> > >> >> >> > > > > > > > request >> > >> >> >> > > > > > > > > > > > switch(true/false) to allow this >> behavior >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - when user upgrade >> job-vertx-parallelism >> > >> . I >> > >> >> want to have >> > >> >> >> > > > > > an >> > >> >> >> > > > > > > > > > > interface >> > >> >> >> > > > > > > > > > > > to query the current update parallel >> > >> execution >> > >> >> status, so >> > >> >> >> > > > > > that >> > >> >> >> > > > > > > > the >> > >> >> >> > > > > > > > > > > user >> > >> >> >> > > > > > > > > > > > or >> > >> >> >> > > > > > > > > > > > program can understand the current >> status >> > >> >> >> > > > > > > > > > > > - I want to have an interface to >> query the >> > >> >> current update >> > >> >> >> > > > > > > > > > parallelism >> > >> >> >> > > > > > > > > > > > execution status. This also helps >> similar >> > >> to >> > >> >> *[1] Flink >> > >> >> >> > > > > K8S >> > >> >> >> > > > > > > > > > Operator* >> > >> >> >> > > > > > > > > > > > management >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > { >> > >> >> >> > > > > > > > > > > > status: Failed >> > >> >> >> > > > > > > > > > > > reason: "less avaliable slots to >> upgrade, >> > >> >> please checkout >> > >> >> >> > > > > > your >> > >> >> >> > > > > > > > > > alivalbe >> > >> >> >> > > > > > > > > > > > slots" >> > >> >> >> > > > > > > > > > > > } >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > - *Pending*: this job now is join the >> > >> upgrade >> > >> >> queue,it >> > >> >> >> > > > > will >> > >> >> >> > > > > > be >> > >> >> >> > > > > > > > > > update >> > >> >> >> > > > > > > > > > > > later >> > >> >> >> > > > > > > > > > > > - *Rescaling*: job now is >> rescaling,wait it >> > >> >> finish >> > >> >> >> > > > > > > > > > > > - *Finished*: finish do it >> > >> >> >> > > > > > > > > > > > - *Failed* : something have wrong,so >> this >> > >> job >> > >> >> is not >> > >> >> >> > > > > > alivable >> > >> >> >> > > > > > > > > > upgrade >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > I want to supplement my above content in >> flip, >> > >> >> what do you >> > >> >> >> > > > > > think >> > >> >> >> > > > > > > ? >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > 1. >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > >> > >> >> >> > >> >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/ >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > David Morávek <d...@apache.org> >> 于2023年2月3日周五 >> > >> >> 16:42写道: >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Hi everyone, >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > This FLIP [1] introduces a new REST >> API for >> > >> >> declaring >> > >> >> >> > > > > > resource >> > >> >> >> > > > > > > > > > > > requirements >> > >> >> >> > > > > > > > > > > > > for the Adaptive Scheduler. There >> seems to >> > >> be a >> > >> >> clear need >> > >> >> >> > > > > > for >> > >> >> >> > > > > > > > this >> > >> >> >> > > > > > > > > > API >> > >> >> >> > > > > > > > > > > > > based on the discussion on the >> "Reworking >> > >> the >> > >> >> Rescale API" >> > >> >> >> > > > > > [2] >> > >> >> >> > > > > > > > > > thread. >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Before we get started, this work is >> heavily >> > >> >> based on the >> > >> >> >> > > > > > > > prototype >> > >> >> >> > > > > > > > > > [3] >> > >> >> >> > > > > > > > > > > > > created by Till Rohrmann, and the FLIP >> is >> > >> being >> > >> >> published >> > >> >> >> > > > > > with >> > >> >> >> > > > > > > > his >> > >> >> >> > > > > > > > > > > > consent. >> > >> >> >> > > > > > > > > > > > > Big shoutout to him! >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Last and not least, thanks to Chesnay >> and >> > >> Roman >> > >> >> for the >> > >> >> >> > > > > > initial >> > >> >> >> > > > > > > > > > reviews >> > >> >> >> > > > > > > > > > > > and >> > >> >> >> > > > > > > > > > > > > discussions. >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > The best start would be watching a >> short >> > >> demo >> > >> >> [4] that I've >> > >> >> >> > > > > > > > > recorded, >> > >> >> >> > > > > > > > > > > > which >> > >> >> >> > > > > > > > > > > > > illustrates newly added capabilities >> > >> (rescaling >> > >> >> the running >> > >> >> >> > > > > > > job, >> > >> >> >> > > > > > > > > > > handing >> > >> >> >> > > > > > > > > > > > > back resources to the RM, and session >> > >> cluster >> > >> >> support). >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > The intuition behind the FLIP is being >> able >> > >> to >> > >> >> define >> > >> >> >> > > > > > resource >> > >> >> >> > > > > > > > > > > > requirements >> > >> >> >> > > > > > > > > > > > > ("resource boundaries") externally >> that the >> > >> >> >> > > > > AdaptiveScheduler >> > >> >> >> > > > > > > can >> > >> >> >> > > > > > > > > > > > navigate >> > >> >> >> > > > > > > > > > > > > within. This is a building block for >> > >> >> higher-level efforts >> > >> >> >> > > > > > such >> > >> >> >> > > > > > > as >> > >> >> >> > > > > > > > > an >> > >> >> >> > > > > > > > > > > > > external Autoscaler. The natural >> extension >> > >> of >> > >> >> this work >> > >> >> >> > > > > would >> > >> >> >> > > > > > > be >> > >> >> >> > > > > > > > to >> > >> >> >> > > > > > > > > > > allow >> > >> >> >> > > > > > > > > > > > > to specify per-vertex ResourceProfiles. >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Looking forward to your thoughts; any >> > >> feedback >> > >> >> is >> > >> >> >> > > > > > appreciated! >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > [1] >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > >> > >> >> >> > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management >> > >> >> >> > > > > > > > > > > > > [2] >> > >> >> >> > > > > > > > > >> > >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 >> > >> >> >> > > > > > > > > > > > > [3] >> > >> >> https://github.com/tillrohrmann/flink/tree/autoscaling >> > >> >> >> > > > > > > > > > > > > [4] >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > >> > >> >> >> https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > > Best, >> > >> >> >> > > > > > > > > > > > > D. >> > >> >> >> > > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > -- >> > >> >> >> > > > > > > > > > > > Best >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > > ConradJam >> > >> >> >> > > > > > > > > > > > >> > >> >> >> > > > > > > > > > > >> > >> >> >> > > > > > > > > > >> > >> >> >> > > > > > > > > >> > >> >> >> > > > > > > > >> > >> >> >> > > > > > > >> > >> >> >> > > > > > >> > >> >> >> > > > > >> > >> >> >> > >> >> > > >> >