Thanks for the answer, David! It sounds like there is a race condition, but it’s a known issue not specific to this FLIP, and the failure case isn’t too bad. I’m satisfied with that.
Thanks, John On Thu, Feb 23, 2023, at 10:39, David Morávek wrote: > Hi Everyone, > > @John > > This is a problem that we've spent some time trying to crack; in the end, > we've decided to go against doing any upgrades to JobGraphStore from > JobMaster to avoid having multiple writers that are guarded by different > leader election lock (Dispatcher and JobMaster might live in a different > process). The contract we've decided to choose instead is leveraging the > idempotency of the endpoint and having the user of the API retry in case > we're unable to persist new requirements in the JobGraphStore [1]. We > eventually need to move JobGraphStore out of the dispatcher, but that's way > out of the scope of this FLIP. The solution is a deliberate trade-off. The > worst scenario is that the Dispatcher fails over in between retries, which > would simply rescale the job to meet the previous resource requirements > (more extended unavailability of underlying HA storage would have worse > consequences than this). Does that answer your question? > > @Matthias > > Good catch! I'm fixing it now, thanks! > > [1] > https://github.com/dmvk/flink/commit/5e7edcb77d8522c367bc6977f80173b14dc03ce9#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cR1151 > > Best, > D. > > On Tue, Feb 21, 2023 at 12:24 AM John Roesler <vvcep...@apache.org> wrote: > >> Thanks for the FLIP, David! >> >> I just had one small question. IIUC, the REST API PUT request will go >> through the new DispatcherGateway method to be handled. Then, after >> validation, the dispatcher would call the new JobMasterGateway method to >> actually update the job. >> >> Which component will write the updated JobGraph? I just wanted to make >> sure it’s the JobMaster because it it were the dispatcher, there could be a >> race condition with the async JobMaster method. >> >> Thanks! >> -John >> >> On Mon, Feb 20, 2023, at 07:34, Matthias Pohl wrote: >> > Thanks for your clarifications, David. I don't have any additional major >> > points to add. One thing about the FLIP: The RPC layer API for updating >> the >> > JRR returns a future with a JRR? I don't see value in returning a JRR >> here >> > since it's an idempotent operation? Wouldn't it be enough to return >> > CompletableFuture<Void> here? Or am I missing something? >> > >> > Matthias >> > >> > On Mon, Feb 20, 2023 at 1:48 PM Maximilian Michels <m...@apache.org> >> wrote: >> > >> >> Thanks David! If we could get the pre-allocation working as part of >> >> the FLIP, that would be great. >> >> >> >> Concerning the downscale case, I agree this is a special case for the >> >> (single-job) application mode where we could re-allocate slots in a >> >> way that could leave entire task managers unoccupied which we would >> >> then be able to release. The goal essentially is to reduce slot >> >> fragmentation on scale down by packing the slots efficiently. The >> >> easiest way to add this optimization when running in application mode >> >> would be to drop as many task managers during the restart such that >> >> NUM_REQUIRED_SLOTS >= NUM_AVAILABLE_SLOTS stays true. We can look into >> >> this independently of the FLIP. >> >> >> >> Feel free to start the vote. >> >> >> >> -Max >> >> >> >> On Mon, Feb 20, 2023 at 9:10 AM David Morávek <d...@apache.org> wrote: >> >> > >> >> > Hi everyone, >> >> > >> >> > Thanks for the feedback! I've updated the FLIP to use idempotent PUT >> API >> >> instead of PATCH and to properly handle lower bound settings, to support >> >> the "pre-allocation" of the resources. >> >> > >> >> > @Max >> >> > >> >> > > How hard would it be to address this issue in the FLIP? >> >> > >> >> > I've included this in the FLIP. It might not be too hard to implement >> >> this in the end. >> >> > >> >> > > B) drop as many superfluous task managers as needed >> >> > >> >> > I've intentionally left this part out for now because this ultimately >> >> needs to be the responsibility of the Resource Manager. After all, in >> the >> >> Session Cluster scenario, the Scheduler doesn't have the bigger picture >> of >> >> other tasks of other jobs running on those TMs. This will most likely >> be a >> >> topic for another FLIP. >> >> > >> >> > WDYT? If there are no other questions or concerns, I'd like to start >> the >> >> vote on Wednesday. >> >> > >> >> > Best, >> >> > D. >> >> > >> >> > On Wed, Feb 15, 2023 at 3:34 PM Maximilian Michels <m...@apache.org> >> >> wrote: >> >> >> >> >> >> I missed that the FLIP states: >> >> >> >> >> >> > Currently, even though we’d expose the lower bound for clarity and >> >> API completeness, we won’t allow setting it to any other value than one >> >> until we have full support throughout the stack. >> >> >> >> >> >> How hard would it be to address this issue in the FLIP? >> >> >> >> >> >> There is not much value to offer setting a lower bound which won't be >> >> >> respected / throw an error when it is set. If we had support for a >> >> >> lower bound, we could enforce a resource contract externally via >> >> >> setting lowerBound == upperBound. That ties back to the Rescale API >> >> >> discussion we had. I want to better understand what the major >> concerns >> >> >> would be around allowing this. >> >> >> >> >> >> Just to outline how I imagine the logic to work: >> >> >> >> >> >> A) The resource constraints are already met => Nothing changes >> >> >> B) More resources available than required => Cancel the job, drop as >> >> >> many superfluous task managers as needed, restart the job >> >> >> C) Less resources available than required => Acquire new task >> >> >> managers, wait for them to register, cancel and restart the job >> >> >> >> >> >> I'm open to helping out with the implementation. >> >> >> >> >> >> -Max >> >> >> >> >> >> On Mon, Feb 13, 2023 at 7:45 PM Maximilian Michels <m...@apache.org> >> >> wrote: >> >> >> > >> >> >> > Based on further discussion I had with Chesnay on this PR [1], I >> think >> >> >> > jobs would currently go into a restarting state after the resource >> >> >> > requirements have changed. This wouldn't achieve what we had in >> mind, >> >> >> > i.e. sticking to the old resource requirements until enough slots >> are >> >> >> > available to fulfil the new resource requirements. So this may not >> be >> >> >> > 100% what we need but it could be extended to do what we want. >> >> >> > >> >> >> > -Max >> >> >> > >> >> >> > [1] >> https://github.com/apache/flink/pull/21908#discussion_r1104792362 >> >> >> > >> >> >> > On Mon, Feb 13, 2023 at 7:16 PM Maximilian Michels <m...@apache.org >> > >> >> wrote: >> >> >> > > >> >> >> > > Hi David, >> >> >> > > >> >> >> > > This is awesome! Great writeup and demo. This is pretty much >> what we >> >> >> > > need for the autoscaler as part of the Flink Kubernetes operator >> >> [1]. >> >> >> > > Scaling Flink jobs effectively is hard but fortunately we have >> >> solved >> >> >> > > the issue as part of the Flink Kubernetes operator. The only >> >> critical >> >> >> > > piece we are missing is a better way to execute scaling >> decisions, >> >> as >> >> >> > > discussed in [2]. >> >> >> > > >> >> >> > > Looking at your proposal, we would set lowerBound == upperBound >> for >> >> >> > > the parallelism because we want to fully determine the >> parallelism >> >> >> > > externally based on the scaling metrics. Does that sound right? >> >> >> > > >> >> >> > > What is the timeline for these changes? Is there a JIRA? >> >> >> > > >> >> >> > > Cheers, >> >> >> > > Max >> >> >> > > >> >> >> > > [1] >> >> >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/ >> >> >> > > [2] >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 >> >> >> > > >> >> >> > > On Mon, Feb 13, 2023 at 1:16 PM feng xiangyu < >> xiangyu...@gmail.com> >> >> wrote: >> >> >> > > > >> >> >> > > > Hi David, >> >> >> > > > >> >> >> > > > Thanks for your reply. I think your response totally make >> >> sense. This >> >> >> > > > flip targets on declaring required resource to ResourceManager >> >> instead of >> >> >> > > > using ResourceManager to add/remove TMs directly. >> >> >> > > > >> >> >> > > > Best, >> >> >> > > > Xiangyu >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 15:46写道: >> >> >> > > > >> >> >> > > > > Hi everyone, >> >> >> > > > > >> >> >> > > > > @Shammon >> >> >> > > > > >> >> >> > > > > I'm not entirely sure what "config file" you're referring to. >> >> You can, of >> >> >> > > > > course, override the default parallelism in >> "flink-conf.yaml", >> >> but for >> >> >> > > > > sinks and sources, the parallelism needs to be tweaked on the >> >> connector >> >> >> > > > > level ("WITH" statement). >> >> >> > > > > >> >> >> > > > > This is something that should be achieved with tooling around >> >> Flink. We >> >> >> > > > > want to provide an API on the lowest level that generalizes >> >> well. Achieving >> >> >> > > > > what you're describing should be straightforward with this >> API. >> >> >> > > > > >> >> >> > > > > @Xiangyu >> >> >> > > > > >> >> >> > > > > Is it possible for this REST API to declare TM resources in >> the >> >> future? >> >> >> > > > > >> >> >> > > > > >> >> >> > > > > Would you like to add/remove TMs if you use an active >> Resource >> >> Manager? >> >> >> > > > > This would be out of the scope of this effort since it >> targets >> >> the >> >> >> > > > > scheduler component only (we make no assumptions about the >> used >> >> Resource >> >> >> > > > > Manager). Also, the AdaptiveScheduler is only intended to be >> >> used for >> >> >> > > > > Streaming. >> >> >> > > > > >> >> >> > > > > And for streaming jobs, I'm wondering if there is any >> >> situation we need to >> >> >> > > > > > rescale the TM resources of a flink cluster at first and >> then >> >> the >> >> >> > > > > adaptive >> >> >> > > > > > scheduler will rescale the per-vertex ResourceProfiles >> >> accordingly. >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > > We plan on adding support for the ResourceProfiles (dynamic >> slot >> >> >> > > > > allocation) as the next step. Again we won't make any >> >> assumptions about the >> >> >> > > > > used Resource Manager. In other words, this effort ends by >> >> declaring >> >> >> > > > > desired resources to the Resource Manager. >> >> >> > > > > >> >> >> > > > > Does that make sense? >> >> >> > > > > >> >> >> > > > > @Matthias >> >> >> > > > > >> >> >> > > > > We've done another pass on the proposed API and currently >> lean >> >> towards >> >> >> > > > > having an idempotent PUT API. >> >> >> > > > > - We don't care too much about multiple writers' scenarios in >> >> terms of who >> >> >> > > > > can write an authoritative payload; this is up to the user of >> >> the API to >> >> >> > > > > figure out >> >> >> > > > > - It's indeed tricky to achieve atomicity with PATCH API; >> >> switching to PUT >> >> >> > > > > API seems to do the trick >> >> >> > > > > - We won't allow partial "payloads" anymore, meaning you need >> >> to define >> >> >> > > > > requirements for all vertices in the JobGraph; This is >> >> completely fine for >> >> >> > > > > the programmatic workflows. For DEBUG / DEMO purposes, you >> can >> >> use the GET >> >> >> > > > > endpoint and tweak the response to avoid writing the whole >> >> payload by hand. >> >> >> > > > > >> >> >> > > > > WDYT? >> >> >> > > > > >> >> >> > > > > >> >> >> > > > > Best, >> >> >> > > > > D. >> >> >> > > > > >> >> >> > > > > On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu < >> >> xiangyu...@gmail.com> >> >> >> > > > > wrote: >> >> >> > > > > >> >> >> > > > > > Hi David, >> >> >> > > > > > >> >> >> > > > > > Thanks for creating this flip. I think this work it is very >> >> useful, >> >> >> > > > > > especially in autoscaling scenario. I would like to share >> >> some questions >> >> >> > > > > > from my view. >> >> >> > > > > > >> >> >> > > > > > 1, Is it possible for this REST API to declare TM resources >> >> in the >> >> >> > > > > future? >> >> >> > > > > > I'm asking because we are building the autoscaling feature >> >> for Flink OLAP >> >> >> > > > > > Session Cluster in ByteDance. We need to rescale the >> >> cluster's resource >> >> >> > > > > on >> >> >> > > > > > TM level instead of Job level. It would be very helpful if >> we >> >> have a REST >> >> >> > > > > > API for out external Autoscaling service to use. >> >> >> > > > > > >> >> >> > > > > > 2, And for streaming jobs, I'm wondering if there is any >> >> situation we >> >> >> > > > > need >> >> >> > > > > > to rescale the TM resources of a flink cluster at first and >> >> then the >> >> >> > > > > > adaptive scheduler will rescale the per-vertex >> >> ResourceProfiles >> >> >> > > > > > accordingly. >> >> >> > > > > > >> >> >> > > > > > best. >> >> >> > > > > > Xiangyu >> >> >> > > > > > >> >> >> > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道: >> >> >> > > > > > >> >> >> > > > > > > Hi David >> >> >> > > > > > > >> >> >> > > > > > > Thanks for your answer. >> >> >> > > > > > > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to use >> the >> >> endpoint? I >> >> >> > > > > > > think we can ultimately introduce a way of re-declaring >> >> "per-vertex >> >> >> > > > > > > defaults," but I'd like to understand the use case bit >> more >> >> first. >> >> >> > > > > > > >> >> >> > > > > > > For this issue, I mainly consider the consistency of user >> >> configuration >> >> >> > > > > > and >> >> >> > > > > > > job runtime. For sql jobs, users usually set specific >> >> parallelism for >> >> >> > > > > > > source and sink, and set a global parallelism for other >> >> operators. >> >> >> > > > > These >> >> >> > > > > > > config items are stored in a config file. For some >> >> high-priority jobs, >> >> >> > > > > > > users may want to manage them manually. >> >> >> > > > > > > 1. When users need to scale the parallelism, they should >> >> update the >> >> >> > > > > > config >> >> >> > > > > > > file and restart flink job, which may take a long time. >> >> >> > > > > > > 2. After providing the REST API, users can just send a >> >> request to the >> >> >> > > > > job >> >> >> > > > > > > via REST API quickly after updating the config file. >> >> >> > > > > > > The configuration in the running job and config file >> should >> >> be the >> >> >> > > > > same. >> >> >> > > > > > > What do you think of this? >> >> >> > > > > > > >> >> >> > > > > > > best. >> >> >> > > > > > > Shammon >> >> >> > > > > > > >> >> >> > > > > > > >> >> >> > > > > > > >> >> >> > > > > > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek < >> >> david.mora...@gmail.com> >> >> >> > > > > > > wrote: >> >> >> > > > > > > >> >> >> > > > > > > > Hi everyone, >> >> >> > > > > > > > >> >> >> > > > > > > > Let's try to answer the questions one by one. >> >> >> > > > > > > > >> >> >> > > > > > > > *@ConradJam* >> >> >> > > > > > > > >> >> >> > > > > > > > when the number of "slots" is insufficient, can we can >> >> stop users >> >> >> > > > > > > rescaling >> >> >> > > > > > > > > or throw something to tell user "less avaliable slots >> >> to upgrade, >> >> >> > > > > > > please >> >> >> > > > > > > > > checkout your alivalbe slots" ? >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > The main property of AdaptiveScheduler is that it can >> >> adapt to >> >> >> > > > > > "available >> >> >> > > > > > > > resources," which means you're still able to make >> >> progress even >> >> >> > > > > though >> >> >> > > > > > > you >> >> >> > > > > > > > didn't get all the slots you've asked for. Let's break >> >> down the pros >> >> >> > > > > > and >> >> >> > > > > > > > cons of this property. >> >> >> > > > > > > > >> >> >> > > > > > > > - (plus) If you lose a TM for some reason, you can >> still >> >> recover even >> >> >> > > > > > if >> >> >> > > > > > > it >> >> >> > > > > > > > doesn't come back. We still need to give it some time >> to >> >> eliminate >> >> >> > > > > > > > unnecessary rescaling, which can be controlled by >> setting >> >> >> > > > > > > > "resource-stabilization-timeout." >> >> >> > > > > > > > - (plus) The resources can arrive with a significant >> >> delay. For >> >> >> > > > > > example, >> >> >> > > > > > > > you're unable to spawn enough TMs on time because >> you've >> >> run out of >> >> >> > > > > > > > resources in your k8s cluster, and you need to wait for >> >> the cluster >> >> >> > > > > > auto >> >> >> > > > > > > > scaler to kick in and add new nodes to the cluster. In >> >> this scenario, >> >> >> > > > > > > > you'll be able to start making progress faster, at the >> >> cost of >> >> >> > > > > multiple >> >> >> > > > > > > > rescalings (once the remaining resources arrive). >> >> >> > > > > > > > - (plus) This plays well with the declarative manner of >> >> today's >> >> >> > > > > > > > infrastructure. For example, you tell k8s that you need >> >> 10 TMs, and >> >> >> > > > > > > you'll >> >> >> > > > > > > > eventually get them. >> >> >> > > > > > > > - (minus) In the case of large state jobs, the cost of >> >> multiple >> >> >> > > > > > > rescalings >> >> >> > > > > > > > might outweigh the above. >> >> >> > > > > > > > >> >> >> > > > > > > > We've already touched on the solution to this problem >> on >> >> the FLIP. >> >> >> > > > > > Please >> >> >> > > > > > > > notice the parallelism knob being a range with a lower >> >> and upper >> >> >> > > > > bound. >> >> >> > > > > > > > Setting both the lower and upper bound to the same >> value >> >> could give >> >> >> > > > > the >> >> >> > > > > > > > behavior you're describing at the cost of giving up >> some >> >> properties >> >> >> > > > > > that >> >> >> > > > > > > AS >> >> >> > > > > > > > gives you (you'd be falling back to the >> >> DefaultScheduler's behavior). >> >> >> > > > > > > > >> >> >> > > > > > > > when user upgrade job-vertx-parallelism . I want to >> have >> >> an interface >> >> >> > > > > > to >> >> >> > > > > > > > > query the current update parallel execution status, >> so >> >> that the >> >> >> > > > > user >> >> >> > > > > > or >> >> >> > > > > > > > > program can understand the current status >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > This is a misunderstanding. We're not introducing the >> >> RESCALE >> >> >> > > > > endpoint. >> >> >> > > > > > > > This endpoint allows you to re-declare the resources >> >> needed to run >> >> >> > > > > the >> >> >> > > > > > > job. >> >> >> > > > > > > > Once you reach the desired resources (you get more >> >> resources than the >> >> >> > > > > > > lower >> >> >> > > > > > > > bound defines), your job will run. >> >> >> > > > > > > > >> >> >> > > > > > > > We can expose a similar endpoint to "resource >> >> requirements" to give >> >> >> > > > > you >> >> >> > > > > > > an >> >> >> > > > > > > > overview of the resources the vertices already have. >> You >> >> can already >> >> >> > > > > > get >> >> >> > > > > > > > this from the REST API, so exposing this in yet another >> >> way should be >> >> >> > > > > > > > considered carefully. >> >> >> > > > > > > > >> >> >> > > > > > > > *@Matthias* >> >> >> > > > > > > > >> >> >> > > > > > > > I'm wondering whether it makes sense to add some kind >> of >> >> resource ID >> >> >> > > > > to >> >> >> > > > > > > the >> >> >> > > > > > > > > REST API. >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > That's a good question. I want to think about that and >> >> get back to >> >> >> > > > > the >> >> >> > > > > > > > question later. My main struggle when thinking about >> this >> >> is, "if >> >> >> > > > > this >> >> >> > > > > > > > would be an idempotent POST endpoint," would it be any >> >> different? >> >> >> > > > > > > > >> >> >> > > > > > > > How often do we allow resource requirements to be >> changed? >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > There shall be no rate limiting on the FLINK side. If >> >> this is >> >> >> > > > > something >> >> >> > > > > > > > your environment needs, you can achieve it on a >> different >> >> layer ("we >> >> >> > > > > > > can't >> >> >> > > > > > > > have FLINK to do everything"). >> >> >> > > > > > > > >> >> >> > > > > > > > Versioning the JobGraph in the JobGraphStore rather >> than >> >> overwriting >> >> >> > > > > it >> >> >> > > > > > > > > might be an idea. >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > This sounds interesting since it would be closer to the >> >> JobGraph >> >> >> > > > > being >> >> >> > > > > > > > immutable. The main problem I see here is that this >> would >> >> introduce a >> >> >> > > > > > > > BW-incompatible change so it might be a topic for >> >> follow-up FLIP. >> >> >> > > > > > > > >> >> >> > > > > > > > I'm just wondering whether we bundle two things >> together >> >> that are >> >> >> > > > > > > actually >> >> >> > > > > > > > > separate >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > Yup, this is how we think about it as well. The main >> >> question is, >> >> >> > > > > "who >> >> >> > > > > > > > should be responsible for bookkeeping 1) the JobGraph >> and >> >> 2) the >> >> >> > > > > > > > JobResourceRequirements". The JobMaster would be the >> >> right place for >> >> >> > > > > > > both, >> >> >> > > > > > > > but it's currently not the case, and we're tightly >> >> coupling the >> >> >> > > > > > > dispatcher >> >> >> > > > > > > > with the JobMaster. >> >> >> > > > > > > > >> >> >> > > > > > > > Initially, we tried to introduce a separate HA >> component >> >> in JobMaster >> >> >> > > > > > for >> >> >> > > > > > > > bookkeeping the JobResourceRequirements, but that >> proved >> >> to be a more >> >> >> > > > > > > > significant effort adding additional mess to the >> already >> >> messy HA >> >> >> > > > > > > > ecosystem. Another approach we've discussed was >> mutating >> >> the JobGraph >> >> >> > > > > > and >> >> >> > > > > > > > setting JRR into the JobGraph structure itself. >> >> >> > > > > > > > >> >> >> > > > > > > > The middle ground for keeping this effort reasonably >> >> sized and not >> >> >> > > > > > > > violating "we want to keep JG immutable" too much is >> >> keeping the >> >> >> > > > > > > > JobResourceRequirements separate as an internal config >> >> option in >> >> >> > > > > > > JobGraph's >> >> >> > > > > > > > configuration. >> >> >> > > > > > > > >> >> >> > > > > > > > We ultimately need to rethink the tight coupling of >> >> Dispatcher and >> >> >> > > > > > > > JobMaster, but it needs to be a separate effort. >> >> >> > > > > > > > >> >> >> > > > > > > > ...also considering the amount of data that can be >> stored >> >> in a >> >> >> > > > > > > > > ConfigMap/ZooKeeper node if versioning the resource >> >> requirement >> >> >> > > > > > change >> >> >> > > > > > > as >> >> >> > > > > > > > > proposed in my previous item is an option for us. >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > AFAIK we're only storing pointers to the S3 objects in >> HA >> >> metadata, >> >> >> > > > > so >> >> >> > > > > > we >> >> >> > > > > > > > should be okay with having larger structures for now. >> >> >> > > > > > > > >> >> >> > > > > > > > Updating the JobGraphStore means adding more requests >> to >> >> the HA >> >> >> > > > > backend >> >> >> > > > > > > > API. >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > It's fine unless you intend to override the resource >> >> requirements a >> >> >> > > > > few >> >> >> > > > > > > > times per second. >> >> >> > > > > > > > >> >> >> > > > > > > > *@Shammon* >> >> >> > > > > > > > >> >> >> > > > > > > > How about adding some more information such as vertex >> type >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > Since it was intended as a "debug" endpoint, it makes >> >> complete sense! >> >> >> > > > > > > > >> >> >> > > > > > > > For sql jobs, we always use a unified parallelism for >> >> most vertices. >> >> >> > > > > > Can >> >> >> > > > > > > > > we provide them with a more convenient setting method >> >> instead of >> >> >> > > > > each >> >> >> > > > > > > > one? >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > I completely feel with this. The main thoughts when >> >> designing the API >> >> >> > > > > > > were: >> >> >> > > > > > > > - We want to keep it clean and easy to understand. >> >> >> > > > > > > > - Global parallelism can be modeled using per-vertex >> >> parallelism but >> >> >> > > > > > not >> >> >> > > > > > > > the other way around. >> >> >> > > > > > > > - The API will be used by external tooling (operator, >> >> auto scaler). >> >> >> > > > > > > > >> >> >> > > > > > > > Can you elaborate more about how you'd intend to use >> the >> >> endpoint? I >> >> >> > > > > > > think >> >> >> > > > > > > > we can ultimately introduce a way of re-declaring >> >> "per-vertex >> >> >> > > > > > defaults," >> >> >> > > > > > > > but I'd like to understand the use case bit more first. >> >> >> > > > > > > > >> >> >> > > > > > > > *@Weijie* >> >> >> > > > > > > > >> >> >> > > > > > > > What is the default value here (based on what >> >> configuration), or just >> >> >> > > > > > > > > infinite? >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > Currently, for the lower bound, it's always one, and >> for >> >> the upper >> >> >> > > > > > bound, >> >> >> > > > > > > > it's either parallelism (if defined) or the >> >> maxParallelism of the >> >> >> > > > > > vertex >> >> >> > > > > > > in >> >> >> > > > > > > > JobGraph. This question might be another signal for >> >> making the >> >> >> > > > > defaults >> >> >> > > > > > > > explicit (see the answer to Shammon's question above). >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > Thanks, everyone, for your initial thoughts! >> >> >> > > > > > > > >> >> >> > > > > > > > Best, >> >> >> > > > > > > > D. >> >> >> > > > > > > > >> >> >> > > > > > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo < >> >> guoweijieres...@gmail.com >> >> >> > > > > > >> >> >> > > > > > > > wrote: >> >> >> > > > > > > > >> >> >> > > > > > > > > Thanks David for driving this. This is a very >> valuable >> >> work, >> >> >> > > > > > especially >> >> >> > > > > > > > for >> >> >> > > > > > > > > cloud native environment. >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> How about adding some more information such as >> >> vertex type >> >> >> > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get >> jobs >> >> >> > > > > > > > > resource-requirements`? For users, only vertex-id may >> >> be difficult >> >> >> > > > > to >> >> >> > > > > > > > > understand. >> >> >> > > > > > > > > >> >> >> > > > > > > > > +1 for this suggestion, including jobvertex's name in >> >> the response >> >> >> > > > > > body >> >> >> > > > > > > > is >> >> >> > > > > > > > > more >> >> >> > > > > > > > > user-friendly. >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > I saw this sentence in FLIP: "Setting the upper bound >> >> to -1 will >> >> >> > > > > > reset >> >> >> > > > > > > > the >> >> >> > > > > > > > > value to the default setting." What is the default >> >> value here >> >> >> > > > > (based >> >> >> > > > > > > on >> >> >> > > > > > > > > what configuration), or just infinite? >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > Best regards, >> >> >> > > > > > > > > >> >> >> > > > > > > > > Weijie >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 18:06写道: >> >> >> > > > > > > > > >> >> >> > > > > > > > > > Hi David >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > Thanks for initiating this discussion. I think >> >> declaring job >> >> >> > > > > > resource >> >> >> > > > > > > > > > requirements by REST API is very valuable. I just >> >> left some >> >> >> > > > > > comments >> >> >> > > > > > > as >> >> >> > > > > > > > > > followed >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > 1) How about adding some more information such as >> >> vertex type >> >> >> > > > > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get >> >> jobs >> >> >> > > > > > > > > > resource-requirements`? For users, only vertex-id >> may >> >> be >> >> >> > > > > difficult >> >> >> > > > > > to >> >> >> > > > > > > > > > understand. >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > 2) For sql jobs, we always use a unified >> parallelism >> >> for most >> >> >> > > > > > > vertices. >> >> >> > > > > > > > > Can >> >> >> > > > > > > > > > we provide them with a more convenient setting >> method >> >> instead of >> >> >> > > > > > each >> >> >> > > > > > > > > one? >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > Best, >> >> >> > > > > > > > > > Shammon >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl < >> >> >> > > > > > matthias.p...@aiven.io >> >> >> > > > > > > > > > .invalid> >> >> >> > > > > > > > > > wrote: >> >> >> > > > > > > > > > >> >> >> > > > > > > > > > > Thanks David for creating this FLIP. It sounds >> >> promising and >> >> >> > > > > > useful >> >> >> > > > > > > > to >> >> >> > > > > > > > > > > have. Here are some thoughts from my side (some >> of >> >> them might >> >> >> > > > > be >> >> >> > > > > > > > > rather a >> >> >> > > > > > > > > > > follow-up and not necessarily part of this FLIP): >> >> >> > > > > > > > > > > - I'm wondering whether it makes sense to add >> some >> >> kind of >> >> >> > > > > > resource >> >> >> > > > > > > > ID >> >> >> > > > > > > > > to >> >> >> > > > > > > > > > > the REST API. This would give Flink a tool to >> >> verify the PATCH >> >> >> > > > > > > > request >> >> >> > > > > > > > > of >> >> >> > > > > > > > > > > the external system in a compare-and-set kind of >> >> manner. AFAIU, >> >> >> > > > > > the >> >> >> > > > > > > > > > process >> >> >> > > > > > > > > > > requires the external system to retrieve the >> >> resource >> >> >> > > > > > requirements >> >> >> > > > > > > > > first >> >> >> > > > > > > > > > > (to retrieve the vertex IDs). A resource ID <ABC> >> >> would be sent >> >> >> > > > > > > along >> >> >> > > > > > > > > as >> >> >> > > > > > > > > > a >> >> >> > > > > > > > > > > unique identifier for the provided setup. It's >> >> essentially the >> >> >> > > > > > > > version >> >> >> > > > > > > > > ID >> >> >> > > > > > > > > > > of the currently deployed resource requirement >> >> configuration. >> >> >> > > > > > Flink >> >> >> > > > > > > > > > doesn't >> >> >> > > > > > > > > > > know whether the external system would use the >> >> provided >> >> >> > > > > > information >> >> >> > > > > > > > in >> >> >> > > > > > > > > > some >> >> >> > > > > > > > > > > way to derive a new set of resource requirements >> >> for this job. >> >> >> > > > > > The >> >> >> > > > > > > > > > > subsequent PATCH request with updated resource >> >> requirements >> >> >> > > > > would >> >> >> > > > > > > > > include >> >> >> > > > > > > > > > > the previously retrieved resource ID <ABC>. The >> >> PATCH call >> >> >> > > > > would >> >> >> > > > > > > fail >> >> >> > > > > > > > > if >> >> >> > > > > > > > > > > there was a concurrent PATCH call in between >> >> indicating to the >> >> >> > > > > > > > external >> >> >> > > > > > > > > > > system that the resource requirements were >> >> concurrently >> >> >> > > > > updated. >> >> >> > > > > > > > > > > - How often do we allow resource requirements to >> be >> >> changed? >> >> >> > > > > That >> >> >> > > > > > > > > > question >> >> >> > > > > > > > > > > might make my previous comment on the resource ID >> >> obsolete >> >> >> > > > > > because >> >> >> > > > > > > we >> >> >> > > > > > > > > > could >> >> >> > > > > > > > > > > just make any PATCH call fail if there was a >> >> resource >> >> >> > > > > requirement >> >> >> > > > > > > > > update >> >> >> > > > > > > > > > > within a certain time frame before the request. >> But >> >> such a time >> >> >> > > > > > > > period >> >> >> > > > > > > > > is >> >> >> > > > > > > > > > > something we might want to make configurable >> then, >> >> I guess. >> >> >> > > > > > > > > > > - Versioning the JobGraph in the JobGraphStore >> >> rather than >> >> >> > > > > > > > overwriting >> >> >> > > > > > > > > it >> >> >> > > > > > > > > > > might be an idea. This would enable us to provide >> >> resource >> >> >> > > > > > > > requirement >> >> >> > > > > > > > > > > changes in the UI or through the REST API. It is >> >> related to a >> >> >> > > > > > > problem >> >> >> > > > > > > > > > > around keeping track of the exception history >> >> within the >> >> >> > > > > > > > > > AdaptiveScheduler >> >> >> > > > > > > > > > > and also having to consider multiple versions of >> a >> >> JobGraph. >> >> >> > > > > But >> >> >> > > > > > > for >> >> >> > > > > > > > > that >> >> >> > > > > > > > > > > one, we use the ExecutionGraphInfoStore right >> now. >> >> >> > > > > > > > > > > - Updating the JobGraph in the JobGraphStore >> makes >> >> sense. I'm >> >> >> > > > > > just >> >> >> > > > > > > > > > > wondering whether we bundle two things together >> >> that are >> >> >> > > > > actually >> >> >> > > > > > > > > > separate: >> >> >> > > > > > > > > > > The business logic and the execution >> configuration >> >> (the >> >> >> > > > > resource >> >> >> > > > > > > > > > > requirements). I'm aware that this is not a flaw >> of >> >> the current >> >> >> > > > > > > FLIP >> >> >> > > > > > > > > but >> >> >> > > > > > > > > > > rather something that was not necessary to >> address >> >> in the past >> >> >> > > > > > > > because >> >> >> > > > > > > > > > the >> >> >> > > > > > > > > > > JobGraph was kind of static. I don't remember >> >> whether that was >> >> >> > > > > > > > already >> >> >> > > > > > > > > > > discussed while working on the AdaptiveScheduler >> >> for FLIP-160 >> >> >> > > > > > [1]. >> >> >> > > > > > > > > Maybe, >> >> >> > > > > > > > > > > I'm missing some functionality here that requires >> >> us to have >> >> >> > > > > > > > everything >> >> >> > > > > > > > > > in >> >> >> > > > > > > > > > > one place. But it feels like updating the entire >> >> JobGraph which >> >> >> > > > > > > could >> >> >> > > > > > > > > be >> >> >> > > > > > > > > > > actually a "config change" is not reasonable. >> >> ...also >> >> >> > > > > considering >> >> >> > > > > > > the >> >> >> > > > > > > > > > > amount of data that can be stored in a >> >> ConfigMap/ZooKeeper node >> >> >> > > > > > if >> >> >> > > > > > > > > > > versioning the resource requirement change as >> >> proposed in my >> >> >> > > > > > > previous >> >> >> > > > > > > > > > item >> >> >> > > > > > > > > > > is an option for us. >> >> >> > > > > > > > > > > - Updating the JobGraphStore means adding more >> >> requests to the >> >> >> > > > > HA >> >> >> > > > > > > > > backend >> >> >> > > > > > > > > > > API. There were some concerns shared in the >> >> discussion thread >> >> >> > > > > [2] >> >> >> > > > > > > for >> >> >> > > > > > > > > > > FLIP-270 [3] on pressuring the k8s API server in >> >> the past with >> >> >> > > > > > too >> >> >> > > > > > > > many >> >> >> > > > > > > > > > > calls. Eventhough, it's more likely to be caused >> by >> >> >> > > > > > checkpointing, >> >> >> > > > > > > I >> >> >> > > > > > > > > > still >> >> >> > > > > > > > > > > wanted to bring it up. We're working on a >> >> standardized >> >> >> > > > > > performance >> >> >> > > > > > > > test >> >> >> > > > > > > > > > to >> >> >> > > > > > > > > > > prepare going forward with FLIP-270 [3] right >> now. >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > Best, >> >> >> > > > > > > > > > > Matthias >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > [1] >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler >> >> >> > > > > > > > > > > [2] >> >> >> > > > > > > >> >> https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj >> >> >> > > > > > > > > > > [3] >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam < >> >> jam.gz...@gmail.com >> >> >> > > > > > >> >> >> > > > > > > > wrote: >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > > Hi David: >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > Thank you for drive this flip, which helps less >> >> flink >> >> >> > > > > shutdown >> >> >> > > > > > > time >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > for this flip, I would like to make a few idea >> on >> >> share >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > - when the number of "slots" is >> insufficient, >> >> can we can >> >> >> > > > > > stop >> >> >> > > > > > > > > users >> >> >> > > > > > > > > > > > rescaling or throw something to tell user >> >> "less avaliable >> >> >> > > > > > > slots >> >> >> > > > > > > > to >> >> >> > > > > > > > > > > > upgrade, >> >> >> > > > > > > > > > > > please checkout your alivalbe slots" ? Or we >> >> could have a >> >> >> > > > > > > > request >> >> >> > > > > > > > > > > > switch(true/false) to allow this behavior >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > - when user upgrade job-vertx-parallelism . >> I >> >> want to have >> >> >> > > > > > an >> >> >> > > > > > > > > > > interface >> >> >> > > > > > > > > > > > to query the current update parallel >> execution >> >> status, so >> >> >> > > > > > that >> >> >> > > > > > > > the >> >> >> > > > > > > > > > > user >> >> >> > > > > > > > > > > > or >> >> >> > > > > > > > > > > > program can understand the current status >> >> >> > > > > > > > > > > > - I want to have an interface to query the >> >> current update >> >> >> > > > > > > > > > parallelism >> >> >> > > > > > > > > > > > execution status. This also helps similar to >> >> *[1] Flink >> >> >> > > > > K8S >> >> >> > > > > > > > > > Operator* >> >> >> > > > > > > > > > > > management >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > { >> >> >> > > > > > > > > > > > status: Failed >> >> >> > > > > > > > > > > > reason: "less avaliable slots to upgrade, >> >> please checkout >> >> >> > > > > > your >> >> >> > > > > > > > > > alivalbe >> >> >> > > > > > > > > > > > slots" >> >> >> > > > > > > > > > > > } >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > - *Pending*: this job now is join the >> upgrade >> >> queue,it >> >> >> > > > > will >> >> >> > > > > > be >> >> >> > > > > > > > > > update >> >> >> > > > > > > > > > > > later >> >> >> > > > > > > > > > > > - *Rescaling*: job now is rescaling,wait it >> >> finish >> >> >> > > > > > > > > > > > - *Finished*: finish do it >> >> >> > > > > > > > > > > > - *Failed* : something have wrong,so this >> job >> >> is not >> >> >> > > > > > alivable >> >> >> > > > > > > > > > upgrade >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > I want to supplement my above content in flip, >> >> what do you >> >> >> > > > > > think >> >> >> > > > > > > ? >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > 1. >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > >> >> >> > > > > >> >> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/ >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > David Morávek <d...@apache.org> 于2023年2月3日周五 >> >> 16:42写道: >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > > Hi everyone, >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > This FLIP [1] introduces a new REST API for >> >> declaring >> >> >> > > > > > resource >> >> >> > > > > > > > > > > > requirements >> >> >> > > > > > > > > > > > > for the Adaptive Scheduler. There seems to >> be a >> >> clear need >> >> >> > > > > > for >> >> >> > > > > > > > this >> >> >> > > > > > > > > > API >> >> >> > > > > > > > > > > > > based on the discussion on the "Reworking the >> >> Rescale API" >> >> >> > > > > > [2] >> >> >> > > > > > > > > > thread. >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > Before we get started, this work is heavily >> >> based on the >> >> >> > > > > > > > prototype >> >> >> > > > > > > > > > [3] >> >> >> > > > > > > > > > > > > created by Till Rohrmann, and the FLIP is >> being >> >> published >> >> >> > > > > > with >> >> >> > > > > > > > his >> >> >> > > > > > > > > > > > consent. >> >> >> > > > > > > > > > > > > Big shoutout to him! >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > Last and not least, thanks to Chesnay and >> Roman >> >> for the >> >> >> > > > > > initial >> >> >> > > > > > > > > > reviews >> >> >> > > > > > > > > > > > and >> >> >> > > > > > > > > > > > > discussions. >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > The best start would be watching a short demo >> >> [4] that I've >> >> >> > > > > > > > > recorded, >> >> >> > > > > > > > > > > > which >> >> >> > > > > > > > > > > > > illustrates newly added capabilities >> (rescaling >> >> the running >> >> >> > > > > > > job, >> >> >> > > > > > > > > > > handing >> >> >> > > > > > > > > > > > > back resources to the RM, and session cluster >> >> support). >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > The intuition behind the FLIP is being able >> to >> >> define >> >> >> > > > > > resource >> >> >> > > > > > > > > > > > requirements >> >> >> > > > > > > > > > > > > ("resource boundaries") externally that the >> >> >> > > > > AdaptiveScheduler >> >> >> > > > > > > can >> >> >> > > > > > > > > > > > navigate >> >> >> > > > > > > > > > > > > within. This is a building block for >> >> higher-level efforts >> >> >> > > > > > such >> >> >> > > > > > > as >> >> >> > > > > > > > > an >> >> >> > > > > > > > > > > > > external Autoscaler. The natural extension of >> >> this work >> >> >> > > > > would >> >> >> > > > > > > be >> >> >> > > > > > > > to >> >> >> > > > > > > > > > > allow >> >> >> > > > > > > > > > > > > to specify per-vertex ResourceProfiles. >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > Looking forward to your thoughts; any >> feedback >> >> is >> >> >> > > > > > appreciated! >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > [1] >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management >> >> >> > > > > > > > > > > > > [2] >> >> >> > > > > > > > > >> >> https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 >> >> >> > > > > > > > > > > > > [3] >> >> https://github.com/tillrohrmann/flink/tree/autoscaling >> >> >> > > > > > > > > > > > > [4] >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > >> >> https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > > Best, >> >> >> > > > > > > > > > > > > D. >> >> >> > > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > -- >> >> >> > > > > > > > > > > > Best >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > ConradJam >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >>