Hi David, Thanks for your reply. I think your response totally make sense. This flip targets on declaring required resource to ResourceManager instead of using ResourceManager to add/remove TMs directly.
Best, Xiangyu David Morávek <david.mora...@gmail.com> 于2023年2月13日周一 15:46写道: > Hi everyone, > > @Shammon > > I'm not entirely sure what "config file" you're referring to. You can, of > course, override the default parallelism in "flink-conf.yaml", but for > sinks and sources, the parallelism needs to be tweaked on the connector > level ("WITH" statement). > > This is something that should be achieved with tooling around Flink. We > want to provide an API on the lowest level that generalizes well. Achieving > what you're describing should be straightforward with this API. > > @Xiangyu > > Is it possible for this REST API to declare TM resources in the future? > > > Would you like to add/remove TMs if you use an active Resource Manager? > This would be out of the scope of this effort since it targets the > scheduler component only (we make no assumptions about the used Resource > Manager). Also, the AdaptiveScheduler is only intended to be used for > Streaming. > > And for streaming jobs, I'm wondering if there is any situation we need to > > rescale the TM resources of a flink cluster at first and then the > adaptive > > scheduler will rescale the per-vertex ResourceProfiles accordingly. > > > > We plan on adding support for the ResourceProfiles (dynamic slot > allocation) as the next step. Again we won't make any assumptions about the > used Resource Manager. In other words, this effort ends by declaring > desired resources to the Resource Manager. > > Does that make sense? > > @Matthias > > We've done another pass on the proposed API and currently lean towards > having an idempotent PUT API. > - We don't care too much about multiple writers' scenarios in terms of who > can write an authoritative payload; this is up to the user of the API to > figure out > - It's indeed tricky to achieve atomicity with PATCH API; switching to PUT > API seems to do the trick > - We won't allow partial "payloads" anymore, meaning you need to define > requirements for all vertices in the JobGraph; This is completely fine for > the programmatic workflows. For DEBUG / DEMO purposes, you can use the GET > endpoint and tweak the response to avoid writing the whole payload by hand. > > WDYT? > > > Best, > D. > > On Fri, Feb 10, 2023 at 11:21 AM feng xiangyu <xiangyu...@gmail.com> > wrote: > > > Hi David, > > > > Thanks for creating this flip. I think this work it is very useful, > > especially in autoscaling scenario. I would like to share some questions > > from my view. > > > > 1, Is it possible for this REST API to declare TM resources in the > future? > > I'm asking because we are building the autoscaling feature for Flink OLAP > > Session Cluster in ByteDance. We need to rescale the cluster's resource > on > > TM level instead of Job level. It would be very helpful if we have a REST > > API for out external Autoscaling service to use. > > > > 2, And for streaming jobs, I'm wondering if there is any situation we > need > > to rescale the TM resources of a flink cluster at first and then the > > adaptive scheduler will rescale the per-vertex ResourceProfiles > > accordingly. > > > > best. > > Xiangyu > > > > Shammon FY <zjur...@gmail.com> 于2023年2月9日周四 11:31写道: > > > > > Hi David > > > > > > Thanks for your answer. > > > > > > > Can you elaborate more about how you'd intend to use the endpoint? I > > > think we can ultimately introduce a way of re-declaring "per-vertex > > > defaults," but I'd like to understand the use case bit more first. > > > > > > For this issue, I mainly consider the consistency of user configuration > > and > > > job runtime. For sql jobs, users usually set specific parallelism for > > > source and sink, and set a global parallelism for other operators. > These > > > config items are stored in a config file. For some high-priority jobs, > > > users may want to manage them manually. > > > 1. When users need to scale the parallelism, they should update the > > config > > > file and restart flink job, which may take a long time. > > > 2. After providing the REST API, users can just send a request to the > job > > > via REST API quickly after updating the config file. > > > The configuration in the running job and config file should be the > same. > > > What do you think of this? > > > > > > best. > > > Shammon > > > > > > > > > > > > On Tue, Feb 7, 2023 at 4:51 PM David Morávek <david.mora...@gmail.com> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > Let's try to answer the questions one by one. > > > > > > > > *@ConradJam* > > > > > > > > when the number of "slots" is insufficient, can we can stop users > > > rescaling > > > > > or throw something to tell user "less avaliable slots to upgrade, > > > please > > > > > checkout your alivalbe slots" ? > > > > > > > > > > > > > The main property of AdaptiveScheduler is that it can adapt to > > "available > > > > resources," which means you're still able to make progress even > though > > > you > > > > didn't get all the slots you've asked for. Let's break down the pros > > and > > > > cons of this property. > > > > > > > > - (plus) If you lose a TM for some reason, you can still recover even > > if > > > it > > > > doesn't come back. We still need to give it some time to eliminate > > > > unnecessary rescaling, which can be controlled by setting > > > > "resource-stabilization-timeout." > > > > - (plus) The resources can arrive with a significant delay. For > > example, > > > > you're unable to spawn enough TMs on time because you've run out of > > > > resources in your k8s cluster, and you need to wait for the cluster > > auto > > > > scaler to kick in and add new nodes to the cluster. In this scenario, > > > > you'll be able to start making progress faster, at the cost of > multiple > > > > rescalings (once the remaining resources arrive). > > > > - (plus) This plays well with the declarative manner of today's > > > > infrastructure. For example, you tell k8s that you need 10 TMs, and > > > you'll > > > > eventually get them. > > > > - (minus) In the case of large state jobs, the cost of multiple > > > rescalings > > > > might outweigh the above. > > > > > > > > We've already touched on the solution to this problem on the FLIP. > > Please > > > > notice the parallelism knob being a range with a lower and upper > bound. > > > > Setting both the lower and upper bound to the same value could give > the > > > > behavior you're describing at the cost of giving up some properties > > that > > > AS > > > > gives you (you'd be falling back to the DefaultScheduler's behavior). > > > > > > > > when user upgrade job-vertx-parallelism . I want to have an interface > > to > > > > > query the current update parallel execution status, so that the > user > > or > > > > > program can understand the current status > > > > > > > > > > > > > This is a misunderstanding. We're not introducing the RESCALE > endpoint. > > > > This endpoint allows you to re-declare the resources needed to run > the > > > job. > > > > Once you reach the desired resources (you get more resources than the > > > lower > > > > bound defines), your job will run. > > > > > > > > We can expose a similar endpoint to "resource requirements" to give > you > > > an > > > > overview of the resources the vertices already have. You can already > > get > > > > this from the REST API, so exposing this in yet another way should be > > > > considered carefully. > > > > > > > > *@Matthias* > > > > > > > > I'm wondering whether it makes sense to add some kind of resource ID > to > > > the > > > > > REST API. > > > > > > > > > > > > That's a good question. I want to think about that and get back to > the > > > > question later. My main struggle when thinking about this is, "if > this > > > > would be an idempotent POST endpoint," would it be any different? > > > > > > > > How often do we allow resource requirements to be changed? > > > > > > > > > > > > There shall be no rate limiting on the FLINK side. If this is > something > > > > your environment needs, you can achieve it on a different layer ("we > > > can't > > > > have FLINK to do everything"). > > > > > > > > Versioning the JobGraph in the JobGraphStore rather than overwriting > it > > > > > might be an idea. > > > > > > > > > > > > > This sounds interesting since it would be closer to the JobGraph > being > > > > immutable. The main problem I see here is that this would introduce a > > > > BW-incompatible change so it might be a topic for follow-up FLIP. > > > > > > > > I'm just wondering whether we bundle two things together that are > > > actually > > > > > separate > > > > > > > > > > > > > Yup, this is how we think about it as well. The main question is, > "who > > > > should be responsible for bookkeeping 1) the JobGraph and 2) the > > > > JobResourceRequirements". The JobMaster would be the right place for > > > both, > > > > but it's currently not the case, and we're tightly coupling the > > > dispatcher > > > > with the JobMaster. > > > > > > > > Initially, we tried to introduce a separate HA component in JobMaster > > for > > > > bookkeeping the JobResourceRequirements, but that proved to be a more > > > > significant effort adding additional mess to the already messy HA > > > > ecosystem. Another approach we've discussed was mutating the JobGraph > > and > > > > setting JRR into the JobGraph structure itself. > > > > > > > > The middle ground for keeping this effort reasonably sized and not > > > > violating "we want to keep JG immutable" too much is keeping the > > > > JobResourceRequirements separate as an internal config option in > > > JobGraph's > > > > configuration. > > > > > > > > We ultimately need to rethink the tight coupling of Dispatcher and > > > > JobMaster, but it needs to be a separate effort. > > > > > > > > ...also considering the amount of data that can be stored in a > > > > > ConfigMap/ZooKeeper node if versioning the resource requirement > > change > > > as > > > > > proposed in my previous item is an option for us. > > > > > > > > > > > > > AFAIK we're only storing pointers to the S3 objects in HA metadata, > so > > we > > > > should be okay with having larger structures for now. > > > > > > > > Updating the JobGraphStore means adding more requests to the HA > backend > > > > API. > > > > > > > > > > > > > It's fine unless you intend to override the resource requirements a > few > > > > times per second. > > > > > > > > *@Shammon* > > > > > > > > How about adding some more information such as vertex type > > > > > > > > > > > > > Since it was intended as a "debug" endpoint, it makes complete sense! > > > > > > > > For sql jobs, we always use a unified parallelism for most vertices. > > Can > > > > > we provide them with a more convenient setting method instead of > each > > > > one? > > > > > > > > > > > > I completely feel with this. The main thoughts when designing the API > > > were: > > > > - We want to keep it clean and easy to understand. > > > > - Global parallelism can be modeled using per-vertex parallelism but > > not > > > > the other way around. > > > > - The API will be used by external tooling (operator, auto scaler). > > > > > > > > Can you elaborate more about how you'd intend to use the endpoint? I > > > think > > > > we can ultimately introduce a way of re-declaring "per-vertex > > defaults," > > > > but I'd like to understand the use case bit more first. > > > > > > > > *@Weijie* > > > > > > > > What is the default value here (based on what configuration), or just > > > > > infinite? > > > > > > > > > > > > > Currently, for the lower bound, it's always one, and for the upper > > bound, > > > > it's either parallelism (if defined) or the maxParallelism of the > > vertex > > > in > > > > JobGraph. This question might be another signal for making the > defaults > > > > explicit (see the answer to Shammon's question above). > > > > > > > > > > > > Thanks, everyone, for your initial thoughts! > > > > > > > > Best, > > > > D. > > > > > > > > On Tue, Feb 7, 2023 at 4:39 AM weijie guo <guoweijieres...@gmail.com > > > > > > wrote: > > > > > > > > > Thanks David for driving this. This is a very valuable work, > > especially > > > > for > > > > > cloud native environment. > > > > > > > > > > >> How about adding some more information such as vertex type > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs > > > > > resource-requirements`? For users, only vertex-id may be difficult > to > > > > > understand. > > > > > > > > > > +1 for this suggestion, including jobvertex's name in the response > > body > > > > is > > > > > more > > > > > user-friendly. > > > > > > > > > > > > > > > I saw this sentence in FLIP: "Setting the upper bound to -1 will > > reset > > > > the > > > > > value to the default setting." What is the default value here > (based > > > on > > > > > what configuration), or just infinite? > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年2月6日周一 18:06写道: > > > > > > > > > > > Hi David > > > > > > > > > > > > Thanks for initiating this discussion. I think declaring job > > resource > > > > > > requirements by REST API is very valuable. I just left some > > comments > > > as > > > > > > followed > > > > > > > > > > > > 1) How about adding some more information such as vertex type > > > > > > (SOURCE/MAP/JOIN and .etc) in the response of `get jobs > > > > > > resource-requirements`? For users, only vertex-id may be > difficult > > to > > > > > > understand. > > > > > > > > > > > > 2) For sql jobs, we always use a unified parallelism for most > > > vertices. > > > > > Can > > > > > > we provide them with a more convenient setting method instead of > > each > > > > > one? > > > > > > > > > > > > > > > > > > Best, > > > > > > Shammon > > > > > > > > > > > > > > > > > > On Fri, Feb 3, 2023 at 8:18 PM Matthias Pohl < > > matthias.p...@aiven.io > > > > > > .invalid> > > > > > > wrote: > > > > > > > > > > > > > Thanks David for creating this FLIP. It sounds promising and > > useful > > > > to > > > > > > > have. Here are some thoughts from my side (some of them might > be > > > > > rather a > > > > > > > follow-up and not necessarily part of this FLIP): > > > > > > > - I'm wondering whether it makes sense to add some kind of > > resource > > > > ID > > > > > to > > > > > > > the REST API. This would give Flink a tool to verify the PATCH > > > > request > > > > > of > > > > > > > the external system in a compare-and-set kind of manner. AFAIU, > > the > > > > > > process > > > > > > > requires the external system to retrieve the resource > > requirements > > > > > first > > > > > > > (to retrieve the vertex IDs). A resource ID <ABC> would be sent > > > along > > > > > as > > > > > > a > > > > > > > unique identifier for the provided setup. It's essentially the > > > > version > > > > > ID > > > > > > > of the currently deployed resource requirement configuration. > > Flink > > > > > > doesn't > > > > > > > know whether the external system would use the provided > > information > > > > in > > > > > > some > > > > > > > way to derive a new set of resource requirements for this job. > > The > > > > > > > subsequent PATCH request with updated resource requirements > would > > > > > include > > > > > > > the previously retrieved resource ID <ABC>. The PATCH call > would > > > fail > > > > > if > > > > > > > there was a concurrent PATCH call in between indicating to the > > > > external > > > > > > > system that the resource requirements were concurrently > updated. > > > > > > > - How often do we allow resource requirements to be changed? > That > > > > > > question > > > > > > > might make my previous comment on the resource ID obsolete > > because > > > we > > > > > > could > > > > > > > just make any PATCH call fail if there was a resource > requirement > > > > > update > > > > > > > within a certain time frame before the request. But such a time > > > > period > > > > > is > > > > > > > something we might want to make configurable then, I guess. > > > > > > > - Versioning the JobGraph in the JobGraphStore rather than > > > > overwriting > > > > > it > > > > > > > might be an idea. This would enable us to provide resource > > > > requirement > > > > > > > changes in the UI or through the REST API. It is related to a > > > problem > > > > > > > around keeping track of the exception history within the > > > > > > AdaptiveScheduler > > > > > > > and also having to consider multiple versions of a JobGraph. > But > > > for > > > > > that > > > > > > > one, we use the ExecutionGraphInfoStore right now. > > > > > > > - Updating the JobGraph in the JobGraphStore makes sense. I'm > > just > > > > > > > wondering whether we bundle two things together that are > actually > > > > > > separate: > > > > > > > The business logic and the execution configuration (the > resource > > > > > > > requirements). I'm aware that this is not a flaw of the current > > > FLIP > > > > > but > > > > > > > rather something that was not necessary to address in the past > > > > because > > > > > > the > > > > > > > JobGraph was kind of static. I don't remember whether that was > > > > already > > > > > > > discussed while working on the AdaptiveScheduler for FLIP-160 > > [1]. > > > > > Maybe, > > > > > > > I'm missing some functionality here that requires us to have > > > > everything > > > > > > in > > > > > > > one place. But it feels like updating the entire JobGraph which > > > could > > > > > be > > > > > > > actually a "config change" is not reasonable. ...also > considering > > > the > > > > > > > amount of data that can be stored in a ConfigMap/ZooKeeper node > > if > > > > > > > versioning the resource requirement change as proposed in my > > > previous > > > > > > item > > > > > > > is an option for us. > > > > > > > - Updating the JobGraphStore means adding more requests to the > HA > > > > > backend > > > > > > > API. There were some concerns shared in the discussion thread > [2] > > > for > > > > > > > FLIP-270 [3] on pressuring the k8s API server in the past with > > too > > > > many > > > > > > > calls. Eventhough, it's more likely to be caused by > > checkpointing, > > > I > > > > > > still > > > > > > > wanted to bring it up. We're working on a standardized > > performance > > > > test > > > > > > to > > > > > > > prepare going forward with FLIP-270 [3] right now. > > > > > > > > > > > > > > Best, > > > > > > > Matthias > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler > > > > > > > [2] > > > https://lists.apache.org/thread/bm6rmxxk6fbrqfsgz71gvso58950d4mj > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints > > > > > > > > > > > > > > On Fri, Feb 3, 2023 at 10:31 AM ConradJam <jam.gz...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > Hi David: > > > > > > > > > > > > > > > > Thank you for drive this flip, which helps less flink > shutdown > > > time > > > > > > > > > > > > > > > > for this flip, I would like to make a few idea on share > > > > > > > > > > > > > > > > > > > > > > > > - when the number of "slots" is insufficient, can we can > > stop > > > > > users > > > > > > > > rescaling or throw something to tell user "less avaliable > > > slots > > > > to > > > > > > > > upgrade, > > > > > > > > please checkout your alivalbe slots" ? Or we could have a > > > > request > > > > > > > > switch(true/false) to allow this behavior > > > > > > > > > > > > > > > > > > > > > > > > - when user upgrade job-vertx-parallelism . I want to have > > an > > > > > > > interface > > > > > > > > to query the current update parallel execution status, so > > that > > > > the > > > > > > > user > > > > > > > > or > > > > > > > > program can understand the current status > > > > > > > > - I want to have an interface to query the current update > > > > > > parallelism > > > > > > > > execution status. This also helps similar to *[1] Flink > K8S > > > > > > Operator* > > > > > > > > management > > > > > > > > > > > > > > > > > > > > > > > > { > > > > > > > > status: Failed > > > > > > > > reason: "less avaliable slots to upgrade, please checkout > > your > > > > > > alivalbe > > > > > > > > slots" > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - *Pending*: this job now is join the upgrade queue,it > will > > be > > > > > > update > > > > > > > > later > > > > > > > > - *Rescaling*: job now is rescaling,wait it finish > > > > > > > > - *Finished*: finish do it > > > > > > > > - *Failed* : something have wrong,so this job is not > > alivable > > > > > > upgrade > > > > > > > > > > > > > > > > I want to supplement my above content in flip, what do you > > think > > > ? > > > > > > > > > > > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/ > > > > > > > > > > > > > > > > > > > > > > > > David Morávek <d...@apache.org> 于2023年2月3日周五 16:42写道: > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > This FLIP [1] introduces a new REST API for declaring > > resource > > > > > > > > requirements > > > > > > > > > for the Adaptive Scheduler. There seems to be a clear need > > for > > > > this > > > > > > API > > > > > > > > > based on the discussion on the "Reworking the Rescale API" > > [2] > > > > > > thread. > > > > > > > > > > > > > > > > > > Before we get started, this work is heavily based on the > > > > prototype > > > > > > [3] > > > > > > > > > created by Till Rohrmann, and the FLIP is being published > > with > > > > his > > > > > > > > consent. > > > > > > > > > Big shoutout to him! > > > > > > > > > > > > > > > > > > Last and not least, thanks to Chesnay and Roman for the > > initial > > > > > > reviews > > > > > > > > and > > > > > > > > > discussions. > > > > > > > > > > > > > > > > > > The best start would be watching a short demo [4] that I've > > > > > recorded, > > > > > > > > which > > > > > > > > > illustrates newly added capabilities (rescaling the running > > > job, > > > > > > > handing > > > > > > > > > back resources to the RM, and session cluster support). > > > > > > > > > > > > > > > > > > The intuition behind the FLIP is being able to define > > resource > > > > > > > > requirements > > > > > > > > > ("resource boundaries") externally that the > AdaptiveScheduler > > > can > > > > > > > > navigate > > > > > > > > > within. This is a building block for higher-level efforts > > such > > > as > > > > > an > > > > > > > > > external Autoscaler. The natural extension of this work > would > > > be > > > > to > > > > > > > allow > > > > > > > > > to specify per-vertex ResourceProfiles. > > > > > > > > > > > > > > > > > > Looking forward to your thoughts; any feedback is > > appreciated! > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management > > > > > > > > > [2] > > > > > https://lists.apache.org/thread/2f7dgr88xtbmsohtr0f6wmsvw8sw04f5 > > > > > > > > > [3] https://github.com/tillrohrmann/flink/tree/autoscaling > > > > > > > > > [4] > > > > > > > > > > > > > > > https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > D. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best > > > > > > > > > > > > > > > > ConradJam > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >