Hi Yangze, Thanks for incorporating the ideas and sorry for missing the builder part. My main idea is that SlotSharingGroup is immutable, such that the user doesn't do:
ssg = new SlotSharingGroup(); ssg.setCpus(2); operator1.slotSharingGroup(ssg); ssg.setCpus(4); operator2.slotSharingGroup(ssg); and wonders why both operators have the same CPU spec. But the details can be fleshed out in the actual PR. On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo <karma...@gmail.com> wrote: > Thanks all for the discussion. I've updated the FLIP accordingly, the > key changes are: > - Introduce SlotSharingGroup instead of ResourceSpec which contains > the resource spec of slot sharing group > - Introduce two interfaces for specifying the SlotSharingGroup: > #slotSharingGroup(SlotSharingGroup) and > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup). > > If there is no more feedback, I'd start a vote next week. > > Best, > Yangze Guo > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo <karma...@gmail.com> wrote: > > > > Thanks for the valuable suggestion, Arvid. > > > > 1) Yes, we can add a new SlotSharingGroup which includes the name and > > its resource. After that, we have two interfaces for configuring the > > slot sharing group of an operator: > > - #slotSharingGroup(String name) // the resource of it can be > > configured through StreamExecutionEnvironment#registerSlotSharingGroup > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the > resource > > And one interface to configure the resource of a SSG: > > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > > We can also define the priority of the above two approaches, e.g. the > > resource registering in the StreamExecutionEnvironment will always be > > respected when conflict. That would be well documented. > > > > 2) Yes, I originally add this interface as a shortcut. It seems > > unnecessary now. Will remove it. > > > > 3) I don't think we need to expose the ExternalResource. In the > > builder of SlotSharingGroup, we can introduce a > > #withExternalResource(String name, double value). Also, this interface > > needs to be annotated as evolving. > > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to > > elaborate on the Builder for the SlotSharingGroup. > > > > WDYT? > > > > Best, > > Yangze Guo > > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise <ar...@apache.org> wrote: > > > > > > Hi Yangze, > > > > > > I like the general approach to bind requirements to slotsharing > groups. I > > > think the current approach is also flexible enough that a user could > simply > > > use ParameterTool or similar to use config values and wire that with > their > > > slotgroups, such that different requirements can be tested without > > > recompilation. So I don't see an immediate need to provide a generic > > > solution for yaml configuration for now. > > > > > > Looking at the programmatic interface though, I think we could improve > by > > > quite a bit and I haven't seen these alternatives being considered in > the > > > FLIP: > > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec > > > properties. Instead of using group names, the user could directly > configure > > > such an object. > > > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name > > > could also be omitted and auto-generated > > > ssg1.setCPUCores(4); > > > ... > > > DataStream<Tuple2<String, Integer>> grades = > > > GradeSource > > > .getSource(env, rate) > > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > > > .slotSharingGroup(ssg1); > > > DataStream<Tuple2<String, Integer>> salaries = > > > SalarySource.getSource(env, rate) > > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > > > .slotSharingGroup(ssg2); > > > > > > // run the actual window join program with the same slot > sharing > > > group as grades > > > DataStream<Tuple3<String, Integer, Integer>> joinedStream = > > > runWindowJoin(grades, salaries, > > > windowSize).slotSharingGroup(ssg1); > > > > > > Note that we could make it backward compatible by changing the proposed > > > StreamExecutionEnvironment#setSlotSharingGroupResource to > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > and > > > then use the string name for further reference. > > > > > > 2) I'm also not sure on the StreamExecutionEnvironment# > > > setSlotSharingGroupResources. What's the benefit of the Map version > over > > > having the simple setter? Even if the user has a map > > > slotSharingGroupResources, he could simply do > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource); > > > > > > 3) Is defining the ExternalResource part of this FLIP? I don't see a > > > Public* class yet. I'd be also fine to cut the scope of this FLIP and > > > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving. > > > > > > 4) We should probably use a builder pattern around > > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't > think > > > we need to fully specify that in the FLIP but it would be good to at > least > > > say how they should be created by the user. > > > > > > > > > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <karma...@gmail.com> wrote: > > > > > > > @Yang > > > > In short, the external resources will participate in resource > > > > deduction and be logically ensured, but requesting an external > > > > resource must still be done through config options with the current > > > > default resource allocation strategy. > > > > In FLIP-56, we abstract the logic of resource allocation to the > > > > `ResourceAllocationStrategy`. Currently, with its default > > > > implementation, ResourceManager would still allocate TMs with the > same > > > > resource spec and the external resources of it are configured through > > > > the config option as well. So, in your case, you need to define the > > > > "external-resources" and "external-resources.disk.amount". Then, all > > > > the disk requirements defined in the SSG will be logically ensured, > as > > > > there is no slot level isolation. If the disk space of a task manager > > > > cannot fulfill the disk requirement, RM will allocate a new one. > > > > In the future, we'd like to introduce a `ResourceAllocationStrategy` > > > > which allocates heterogeneous TMs according to the requirements. > Then, > > > > user only needs to define the driver of external resources when > > > > needed. > > > > Also, regarding the resource isolation, we may provide a fine-grained > > > > mode in which each slot can only fetch the information of external > > > > resources it requires in the future. But that is out of the scope of > > > > this PR. > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <danrtsey...@gmail.com> > wrote: > > > > > > > > > > Thanks @Yangze for preparing this FLIP. > > > > > > > > > > I think this is a good start point for the community users to have > a > > > > taste > > > > > on the fine-grained > > > > > resource management, which we all believe it could improve the > Flink job > > > > > stability and > > > > > cluster utilization. > > > > > > > > > > I have a simple question about the extended resources. It is > possible to > > > > > combine extended resources > > > > > with fine-grained resource management. Except for the GPU, FPGA > and other > > > > > new computing devices, > > > > > maybe the disk resource is a more general use case. For example, > > > > different > > > > > SSG may have various > > > > > disk requirements based on the state. So we need to allocate enough > > > > > ephemeral storage resource for every > > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be > evicted > > > > > due to running out of limits. > > > > > > > > > > > > > > > Best, > > > > > Yang > > > > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2021年6月8日周二 下午1:47写道: > > > > > > > > > > > I think being able to specify fine grained resource requirements > > > > without > > > > > > having to change the codes and recompile the job is indeed a good > > > > idea. It > > > > > > definitely improves the usability. > > > > > > > > > > > > However, this requires more careful designs, which probably > deserves a > > > > > > separate thread. I'd be good to have that discussion, but maybe > not > > > > block > > > > > > this feature on that. > > > > > > > > > > > > One idea concerning the configuration approach: As Yangze said, > flink > > > > > > configuration options are supposed to take effect at cluster > level. For > > > > > > updating job level specifics that are not suitable to be > introduced as > > > > a > > > > > > config option, currently the only way is to pass them as program > > > > arguments. > > > > > > Would it make sense to introduce a general approach for > overwriting > > > > such > > > > > > job specifics without re-compiling the job? > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <karma...@gmail.com> > wrote: > > > > > > > > > > > > > @Wenlong > > > > > > > After another consideration, the config option approach I > mentioned > > > > > > > above might not be appropriate. The resource requirements for > SSG > > > > > > > should be a job level configuration and should no be set in the > > > > > > > flink-conf. > > > > > > > > > > > > > > I think we can define a JSON format, which would be the > ResourceSpecs > > > > > > > mapped by the name of SSGs, for the resource requirements of a > > > > > > > specific job. Then, we allow user to configure the file path > of that > > > > > > > JSON. The JSON will be only parsed in runtime, which allows > user to > > > > > > > tune it without re-compiling the job. > > > > > > > > > > > > > > We can add another #setSlotSharingGroupResources for > configuring the > > > > > > > file path of that JSON: > > > > > > > ``` > > > > > > > /** > > > > > > > * Specify fine-grained resource requirements for slot sharing > groups > > > > > > > with the given resource JSON file. The existing resource > > > > > > > * requirement of the same slot sharing group will be replaced. > > > > > > > */ > > > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources( > > > > > > > String pathToResourceJson); > > > > > > > ``` > > > > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <karma...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong! > > > > > > > > > > > > > > > > @Wenlong > > > > > > > > I think that is a good idea, adjust the resource without > > > > re-compiling > > > > > > > > the job will facilitate the tuning process. > > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg > name}" > > > > > > > > (welcome any proposal for the prefix naming) for the > resource spec > > > > > > > > config of a slot sharing group. Then, user can set the > > > > ResourceSpec of > > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: > {cpu: 1.0, > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl < > > > > wenlong88....@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks Yangze for the flip, it is great for users to be > able to > > > > > > > declare the > > > > > > > > > fine-grained resource requirements for the job. > > > > > > > > > > > > > > > > > > I have one minor suggestion: can we support setting > resource > > > > > > > requirements > > > > > > > > > by configuration? Currently most of the config options in > > > > execution > > > > > > > config > > > > > > > > > can be configured by configuration, and it is very likely > that > > > > users > > > > > > > need > > > > > > > > > to adjust the resource according to the performance of > their job > > > > > > during > > > > > > > > > debugging, Providing a configuration way will make it more > > > > > > convenient. > > > > > > > > > > > > > > > > > > Bests, > > > > > > > > > Wenlong Lyu > > > > > > > > > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song < > tonysong...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks Yangze for preparing the FLIP. > > > > > > > > > > > > > > > > > > > > The proposed changes look good to me. > > > > > > > > > > > > > > > > > > > > As you've mentioned in the implementation plan, I > believe one > > > > of > > > > > > the > > > > > > > most > > > > > > > > > > important tasks of this FLIP is to have the feature well > > > > > > documented. > > > > > > > It > > > > > > > > > > would be really nice if we can keep that in mind and > start > > > > drafting > > > > > > > the > > > > > > > > > > documentation early. > > > > > > > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo < > karma...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, there, > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on > "FLIP-169: > > > > > > DataStream > > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where > we > > > > propose > > > > > > > the > > > > > > > > > > > DataStream API for specifying fine-grained resource > > > > requirements > > > > > > in > > > > > > > > > > > StreamExecutionEnvironment. > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > Looking > > > > > > > > > > > forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >