Thanks for the quick response Yangze. The proposal sounds good to me. Thanks, Zhu
Yangze Guo <karma...@gmail.com> 于2021年6月21日周一 下午3:01写道: > Thanks for the comments, Zhu! > > Yes, it is a known limitation for fine-grained resource management. We > also have filed this issue in FLINK-20865 when we proposed FLIP-156. > > As a first step, I agree that we can mark batch jobs with PIPELINED > edges as an invalid case for this feature. However, just throwing an > exception, in that case, might confuse users who do not understand the > concept of pipeline region. Maybe we can force all the edges in this > scenario to BLOCKING in compiling stage and well document it. So that, > common users will not be interrupted while the expert users can > understand the cost of that usage and make their decision. WDYT? > > Best, > Yangze Guo > > On Mon, Jun 21, 2021 at 2:24 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > Thanks for proposing this @Yangze Guo and sorry for joining the > discussion so late. > > The proposal generally looks good to me. But I find one problem that > batch job with PIPELINED edges might hang if enabling fine-grained > resources. see "Resource Deadlocks could still happen in certain Cases" > section in > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling > > However, this problem may happen only in batch cases with PIPELINED > edges, because > > 1. streaming jobs would always require all resource requirements to be > fulfilled at the same time. > > 2. batch jobs without PIPELINED edges consist of multiple single vertex > regions and thus each slot can be individually used and returned > > So maybe in the first step, let's mark batch jobs with PIPELINED edges > as an invalid case for fine-grained resources and throw exception for it in > early compiling stage? > > > > Thanks, > > Zhu > > > > Yangze Guo <karma...@gmail.com> 于2021年6月15日周二 下午4:57写道: > >> > >> Thanks for the supplement, Arvid and Yun. I've annotated these two > >> points in the FLIP. > >> The vote is now started in [1]. > >> > >> [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html > >> > >> Best, > >> Yangze Guo > >> > >> On Fri, Jun 11, 2021 at 2:50 PM Yun Gao <yungao...@aliyun.com.invalid> > wrote: > >> > > >> > Hi, > >> > > >> > Very thanks @Yangze for bringing up this discuss. Overall +1 for > >> > exposing the fine-grained resource requirements in the DataStream API. > >> > > >> > One similar issue as Arvid has pointed out is that users may also > creating > >> > different SlotSharingGroup objects, with different names but with > different > >> > resources. We might need to do some check internally. But We could > also > >> > leave that during the development of the actual PR. > >> > > >> > Best, > >> > Yun > >> > > >> > > >> > > >> > ------------------Original Mail ------------------ > >> > Sender:Arvid Heise <ar...@apache.org> > >> > Send Date:Thu Jun 10 15:33:37 2021 > >> > Recipients:dev <dev@flink.apache.org> > >> > Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained > Resource Requirements > >> > Hi Yangze, > >> > > >> > > >> > > >> > Thanks for incorporating the ideas and sorry for missing the builder > part. > >> > > >> > My main idea is that SlotSharingGroup is immutable, such that the user > >> > > >> > doesn't do: > >> > > >> > > >> > > >> > ssg = new SlotSharingGroup(); > >> > > >> > ssg.setCpus(2); > >> > > >> > operator1.slotSharingGroup(ssg); > >> > > >> > ssg.setCpus(4); > >> > > >> > operator2.slotSharingGroup(ssg); > >> > > >> > > >> > > >> > and wonders why both operators have the same CPU spec. But the > details can > >> > > >> > be fleshed out in the actual PR. > >> > > >> > > >> > > >> > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo wrote: > >> > > >> > > >> > > >> > > Thanks all for the discussion. I've updated the FLIP accordingly, > the > >> > > >> > > key changes are: > >> > > >> > > - Introduce SlotSharingGroup instead of ResourceSpec which contains > >> > > >> > > the resource spec of slot sharing group > >> > > >> > > - Introduce two interfaces for specifying the SlotSharingGroup: > >> > > >> > > #slotSharingGroup(SlotSharingGroup) and > >> > > >> > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup). > >> > > >> > > > >> > > >> > > If there is no more feedback, I'd start a vote next week. > >> > > >> > > > >> > > >> > > Best, > >> > > >> > > Yangze Guo > >> > > >> > > > >> > > >> > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo wrote: > >> > > >> > > > > >> > > >> > > > Thanks for the valuable suggestion, Arvid. > >> > > >> > > > > >> > > >> > > > 1) Yes, we can add a new SlotSharingGroup which includes the name > and > >> > > >> > > > its resource. After that, we have two interfaces for configuring > the > >> > > >> > > > slot sharing group of an operator: > >> > > >> > > > - #slotSharingGroup(String name) // the resource of it can be > >> > > >> > > > configured through > StreamExecutionEnvironment#registerSlotSharingGroup > >> > > >> > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure > the > >> > > >> > > resource > >> > > >> > > > And one interface to configure the resource of a SSG: > >> > > >> > > > - > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > >> > > >> > > > We can also define the priority of the above two approaches, e.g. > the > >> > > >> > > > resource registering in the StreamExecutionEnvironment will > always be > >> > > >> > > > respected when conflict. That would be well documented. > >> > > >> > > > > >> > > >> > > > 2) Yes, I originally add this interface as a shortcut. It seems > >> > > >> > > > unnecessary now. Will remove it. > >> > > >> > > > > >> > > >> > > > 3) I don't think we need to expose the ExternalResource. In the > >> > > >> > > > builder of SlotSharingGroup, we can introduce a > >> > > >> > > > #withExternalResource(String name, double value). Also, this > interface > >> > > >> > > > needs to be annotated as evolving. > >> > > >> > > > > >> > > >> > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would be > good to > >> > > >> > > > elaborate on the Builder for the SlotSharingGroup. > >> > > >> > > > > >> > > >> > > > WDYT? > >> > > >> > > > > >> > > >> > > > Best, > >> > > >> > > > Yangze Guo > >> > > >> > > > > >> > > >> > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise wrote: > >> > > >> > > > > > >> > > >> > > > > Hi Yangze, > >> > > >> > > > > > >> > > >> > > > > I like the general approach to bind requirements to slotsharing > >> > > >> > > groups. I > >> > > >> > > > > think the current approach is also flexible enough that a user > could > >> > > >> > > simply > >> > > >> > > > > use ParameterTool or similar to use config values and wire that > with > >> > > >> > > their > >> > > >> > > > > slotgroups, such that different requirements can be tested > without > >> > > >> > > > > recompilation. So I don't see an immediate need to provide a > generic > >> > > >> > > > > solution for yaml configuration for now. > >> > > >> > > > > > >> > > >> > > > > Looking at the programmatic interface though, I think we could > improve > >> > > >> > > by > >> > > >> > > > > quite a bit and I haven't seen these alternatives being > considered in > >> > > >> > > the > >> > > >> > > > > FLIP: > >> > > >> > > > > 1) Add new class SlotSharingGroup that incorporates all > ResourceSpec > >> > > >> > > > > properties. Instead of using group names, the user could > directly > >> > > >> > > configure > >> > > >> > > > > such an object. > >> > > >> > > > > > >> > > >> > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name > >> > > >> > > > > could also be omitted and auto-generated > >> > > >> > > > > ssg1.setCPUCores(4); > >> > > >> > > > > ... > >> > > >> > > > > DataStream> grades = > >> > > >> > > > > GradeSource > >> > > >> > > > > .getSource(env, rate) > >> > > >> > > > > > >> > > >> > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > >> > > >> > > > > .slotSharingGroup(ssg1); > >> > > >> > > > > DataStream> salaries = > >> > > >> > > > > SalarySource.getSource(env, rate) > >> > > >> > > > > > >> > > >> > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > >> > > >> > > > > .slotSharingGroup(ssg2); > >> > > >> > > > > > >> > > >> > > > > // run the actual window join program with the same slot > >> > > >> > > sharing > >> > > >> > > > > group as grades > >> > > >> > > > > DataStream> joinedStream = > >> > > >> > > > > runWindowJoin(grades, salaries, > >> > > >> > > > > windowSize).slotSharingGroup(ssg1); > >> > > >> > > > > > >> > > >> > > > > Note that we could make it backward compatible by changing the > proposed > >> > > >> > > > > StreamExecutionEnvironment#setSlotSharingGroupResource to > >> > > >> > > > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > >> > > >> > > and > >> > > >> > > > > then use the string name for further reference. > >> > > >> > > > > > >> > > >> > > > > 2) I'm also not sure on the StreamExecutionEnvironment# > >> > > >> > > > > setSlotSharingGroupResources. What's the benefit of the Map > version > >> > > >> > > over > >> > > >> > > > > having the simple setter? Even if the user has a map > >> > > >> > > > > slotSharingGroupResources, he could simply do > >> > > >> > > > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource); > >> > > >> > > > > > >> > > >> > > > > 3) Is defining the ExternalResource part of this FLIP? I don't > see a > >> > > >> > > > > Public* class yet. I'd be also fine to cut the scope of this > FLIP and > >> > > >> > > > > remove it for now and annotate ResourceSpec/SlotSharingGroup > evolving. > >> > > >> > > > > > >> > > >> > > > > 4) We should probably use a builder pattern around > >> > > >> > > > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I > don't > >> > > >> > > think > >> > > >> > > > > we need to fully specify that in the FLIP but it would be good > to at > >> > > >> > > least > >> > > >> > > > > say how they should be created by the user. > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo wrote: > >> > > >> > > > > > >> > > >> > > > > > @Yang > >> > > >> > > > > > In short, the external resources will participate in resource > >> > > >> > > > > > deduction and be logically ensured, but requesting an external > >> > > >> > > > > > resource must still be done through config options with the > current > >> > > >> > > > > > default resource allocation strategy. > >> > > >> > > > > > In FLIP-56, we abstract the logic of resource allocation to > the > >> > > >> > > > > > `ResourceAllocationStrategy`. Currently, with its default > >> > > >> > > > > > implementation, ResourceManager would still allocate TMs with > the > >> > > >> > > same > >> > > >> > > > > > resource spec and the external resources of it are configured > through > >> > > >> > > > > > the config option as well. So, in your case, you need to > define the > >> > > >> > > > > > "external-resources" and "external-resources.disk.amount". > Then, all > >> > > >> > > > > > the disk requirements defined in the SSG will be logically > ensured, > >> > > >> > > as > >> > > >> > > > > > there is no slot level isolation. If the disk space of a task > manager > >> > > >> > > > > > cannot fulfill the disk requirement, RM will allocate a new > one. > >> > > >> > > > > > In the future, we'd like to introduce a > `ResourceAllocationStrategy` > >> > > >> > > > > > which allocates heterogeneous TMs according to the > requirements. > >> > > >> > > Then, > >> > > >> > > > > > user only needs to define the driver of external resources > when > >> > > >> > > > > > needed. > >> > > >> > > > > > Also, regarding the resource isolation, we may provide a > fine-grained > >> > > >> > > > > > mode in which each slot can only fetch the information of > external > >> > > >> > > > > > resources it requires in the future. But that is out of the > scope of > >> > > >> > > > > > this PR. > >> > > >> > > > > > > >> > > >> > > > > > Best, > >> > > >> > > > > > Yangze Guo > >> > > >> > > > > > > >> > > >> > > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang > >> > > >> > > wrote: > >> > > >> > > > > > > > >> > > >> > > > > > > Thanks @Yangze for preparing this FLIP. > >> > > >> > > > > > > > >> > > >> > > > > > > I think this is a good start point for the community users > to have > >> > > >> > > a > >> > > >> > > > > > taste > >> > > >> > > > > > > on the fine-grained > >> > > >> > > > > > > resource management, which we all believe it could improve > the > >> > > >> > > Flink job > >> > > >> > > > > > > stability and > >> > > >> > > > > > > cluster utilization. > >> > > >> > > > > > > > >> > > >> > > > > > > I have a simple question about the extended resources. It is > >> > > >> > > possible to > >> > > >> > > > > > > combine extended resources > >> > > >> > > > > > > with fine-grained resource management. Except for the GPU, > FPGA > >> > > >> > > and other > >> > > >> > > > > > > new computing devices, > >> > > >> > > > > > > maybe the disk resource is a more general use case. For > example, > >> > > >> > > > > > different > >> > > >> > > > > > > SSG may have various > >> > > >> > > > > > > disk requirements based on the state. So we need to > allocate enough > >> > > >> > > > > > > ephemeral storage resource for every > >> > > >> > > > > > > TaskManager pod in Kubernetes deployment. Otherwise, it > might be > >> > > >> > > evicted > >> > > >> > > > > > > due to running out of limits. > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > Best, > >> > > >> > > > > > > Yang > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > Xintong Song 于2021年6月8日周二 下午1:47写道: > >> > > >> > > > > > > > >> > > >> > > > > > > > I think being able to specify fine grained resource > requirements > >> > > >> > > > > > without > >> > > >> > > > > > > > having to change the codes and recompile the job is > indeed a good > >> > > >> > > > > > idea. It > >> > > >> > > > > > > > definitely improves the usability. > >> > > >> > > > > > > > > >> > > >> > > > > > > > However, this requires more careful designs, which > probably > >> > > >> > > deserves a > >> > > >> > > > > > > > separate thread. I'd be good to have that discussion, but > maybe > >> > > >> > > not > >> > > >> > > > > > block > >> > > >> > > > > > > > this feature on that. > >> > > >> > > > > > > > > >> > > >> > > > > > > > One idea concerning the configuration approach: As Yangze > said, > >> > > >> > > flink > >> > > >> > > > > > > > configuration options are supposed to take effect at > cluster > >> > > >> > > level. For > >> > > >> > > > > > > > updating job level specifics that are not suitable to be > >> > > >> > > introduced as > >> > > >> > > > > > a > >> > > >> > > > > > > > config option, currently the only way is to pass them as > program > >> > > >> > > > > > arguments. > >> > > >> > > > > > > > Would it make sense to introduce a general approach for > >> > > >> > > overwriting > >> > > >> > > > > > such > >> > > >> > > > > > > > job specifics without re-compiling the job? > >> > > >> > > > > > > > > >> > > >> > > > > > > > Thank you~ > >> > > >> > > > > > > > > >> > > >> > > > > > > > Xintong Song > >> > > >> > > > > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo > >> > > >> > > wrote: > >> > > >> > > > > > > > > >> > > >> > > > > > > > > @Wenlong > >> > > >> > > > > > > > > After another consideration, the config option approach > I > >> > > >> > > mentioned > >> > > >> > > > > > > > > above might not be appropriate. The resource > requirements for > >> > > >> > > SSG > >> > > >> > > > > > > > > should be a job level configuration and should no be > set in the > >> > > >> > > > > > > > > flink-conf. > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > I think we can define a JSON format, which would be the > >> > > >> > > ResourceSpecs > >> > > >> > > > > > > > > mapped by the name of SSGs, for the resource > requirements of a > >> > > >> > > > > > > > > specific job. Then, we allow user to configure the file > path > >> > > >> > > of that > >> > > >> > > > > > > > > JSON. The JSON will be only parsed in runtime, which > allows > >> > > >> > > user to > >> > > >> > > > > > > > > tune it without re-compiling the job. > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > We can add another #setSlotSharingGroupResources for > >> > > >> > > configuring the > >> > > >> > > > > > > > > file path of that JSON: > >> > > >> > > > > > > > > ``` > >> > > >> > > > > > > > > /** > >> > > >> > > > > > > > > * Specify fine-grained resource requirements for slot > sharing > >> > > >> > > groups > >> > > >> > > > > > > > > with the given resource JSON file. The existing resource > >> > > >> > > > > > > > > * requirement of the same slot sharing group will be > replaced. > >> > > >> > > > > > > > > */ > >> > > >> > > > > > > > > public StreamExecutionEnvironment > setSlotSharingGroupResources( > >> > > >> > > > > > > > > String pathToResourceJson); > >> > > >> > > > > > > > > ``` > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > WDYT? > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > Best, > >> > > >> > > > > > > > > Yangze Guo > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo > >> > > > > >> > > >> > > > > > wrote: > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong! > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > @Wenlong > >> > > >> > > > > > > > > > I think that is a good idea, adjust the resource > without > >> > > >> > > > > > re-compiling > >> > > >> > > > > > > > > > the job will facilitate the tuning process. > >> > > >> > > > > > > > > > We can define a pattern > "slot-sharing-group.resource.{ssg > >> > > >> > > name}" > >> > > >> > > > > > > > > > (welcome any proposal for the prefix naming) for the > >> > > >> > > resource spec > >> > > >> > > > > > > > > > config of a slot sharing group. Then, user can set the > >> > > >> > > > > > ResourceSpec of > >> > > >> > > > > > > > > > SSG "ssg1" by adding > "slot-sharing-group.resource.ssg1: > >> > > >> > > {cpu: 1.0, > >> > > >> > > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT? > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > Best, > >> > > >> > > > > > > > > > Yangze Guo > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl < > >> > > >> > > > > > wenlong88....@gmail.com> > >> > > >> > > > > > > > > wrote: > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > Thanks Yangze for the flip, it is great for users > to be > >> > > >> > > able to > >> > > >> > > > > > > > > declare the > >> > > >> > > > > > > > > > > fine-grained resource requirements for the job. > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > I have one minor suggestion: can we support setting > >> > > >> > > resource > >> > > >> > > > > > > > > requirements > >> > > >> > > > > > > > > > > by configuration? Currently most of the config > options in > >> > > >> > > > > > execution > >> > > >> > > > > > > > > config > >> > > >> > > > > > > > > > > can be configured by configuration, and it is very > likely > >> > > >> > > that > >> > > >> > > > > > users > >> > > >> > > > > > > > > need > >> > > >> > > > > > > > > > > to adjust the resource according to the performance > of > >> > > >> > > their job > >> > > >> > > > > > > > during > >> > > >> > > > > > > > > > > debugging, Providing a configuration way will make > it more > >> > > >> > > > > > > > convenient. > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > Bests, > >> > > >> > > > > > > > > > > Wenlong Lyu > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song < > >> > > >> > > tonysong...@gmail.com > >> > > >> > > > > > > > >> > > >> > > > > > > > > wrote: > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > Thanks Yangze for preparing the FLIP. > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > The proposed changes look good to me. > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > As you've mentioned in the implementation plan, I > >> > > >> > > believe one > >> > > >> > > > > > of > >> > > >> > > > > > > > the > >> > > >> > > > > > > > > most > >> > > >> > > > > > > > > > > > important tasks of this FLIP is to have the > feature well > >> > > >> > > > > > > > documented. > >> > > >> > > > > > > > > It > >> > > >> > > > > > > > > > > > would be really nice if we can keep that in mind > and > >> > > >> > > start > >> > > >> > > > > > drafting > >> > > >> > > > > > > > > the > >> > > >> > > > > > > > > > > > documentation early. > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > Thank you~ > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > Xintong Song > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo < > >> > > >> > > karma...@gmail.com> > >> > > >> > > > > > > > > wrote: > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > Hi, there, > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > We would like to start a discussion thread on > >> > > >> > > "FLIP-169: > >> > > >> > > > > > > > DataStream > >> > > >> > > > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], > where > >> > > >> > > we > >> > > >> > > > > > propose > >> > > >> > > > > > > > > the > >> > > >> > > > > > > > > > > > > DataStream API for specifying fine-grained > resource > >> > > >> > > > > > requirements > >> > > >> > > > > > > > in > >> > > >> > > > > > > > > > > > > StreamExecutionEnvironment. > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > Please find more details in the FLIP wiki > document [1]. > >> > > >> > > > > > Looking > >> > > >> > > > > > > > > > > > > forward to your feedback. > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > [1] > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > >> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > Best, > >> > > >> > > > > > > > > > > > > Yangze Guo > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > >> > > >> > > > >> > >