Hi Till, Thanks for the comments!
I agree with you that we should avoid an auto-scaled job not able to be scheduled in standalone/reactive mode. And I think it's great if we can expose a deployment option that is consistent for streaming and batch jobs, which can be easier to understand. Just looking to the day to make both adaptive schedulers default, so that most users do not need to care about job tuning while the job can run well. Regarding the three options, personally I prefer to take *#1* as the first step, to limit the scope of this FLIP a bit, otherwise it may be too complicated. I think *#3* is the final goal we need to target later, so that mixed bounded and unbounded workloads can be supported. Given that there can be multiple stages scheduled at the same time, the design of the scheduling may not be very straightforward and needs some thorough consideration. *#2* can be a very good improvement itself. Shuffles of batch jobs can be auto-determined to be pipelined or blocking according to available resources. But the changes may involve many components and can be large. So I think it can be a standalone future improvement. Regarding the side note to abstract subpartitions as splits, the idea is very interesting to me. Besides supporting auto scaling, I think trackable produced splits can also help in troubleshooting and give some insights for future improvements. Collecting data sizes for batch adaptive scheduler can be the first step and we can further consider the abstraction of it. Thanks, Zhu Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道: > Hi Lijie, > > Thanks for drafting this FLIP together with Zhu Zhu :-) > > I like the idea of making the parallelism of operators of a bounded job > dependent on the data size. This makes the job adjust automatically when > the data sources/sizes change. > > I can see this work well in combination with the active mode where Flink > can ask for more resources. > > In the case of the standalone mode, I think it can lead to situations where > one and the same job can be scheduled or not depending on the input data. > The problem is pipelined regions that contain more than a single operator > instance (e.g. pipelined shuffles). We already have this problem when > submitting a batch job with too high parallelism onto a standalone cluster. > However, with the adaptive batch mode this problem might become a bit more > present. So my question would be how can we solve this problem (potentially > in a follow up step). I could think of the following three alternatives > atm: > > 1. Only allow blocking data exchanges: This will limit the size of a > pipelined region to a single operator instance. This has the downside that > we no longer support pipelined execution of multiple operators (other than > chained). Moreover, it requires the user to set all data exchanges to > blocking which cannot be enforced atm. > 2. Introduce a new pipelined-blocking data exchange hybrid that supports > pipelined data exchanges but can also spill to disk if there is no > consumer: This could allow to still make progress in case that one has a > pipelined region which requires more slots than what we currently have. > 3. Decide on the actual parallelism of a pipelined region after having > received the slots that are declared based on the data size per subtask. If > the pipelined region contains an all-to-all connection, then the > parallelism is how many slots we currently have. If not, then the > parallelism can be decided by the data volume: This would effectively mean > to enable the existing AdaptiveScheduler to also run batch workloads. > > With either of these options, I believe that we could provide a somewhat > consistent behaviour across the different deployment and execution modes > wrt to scaling: > > a) Active + streaming job that uses AdaptiveScheduler: Can run with fewer > slots than requested. Can ask for more slots. Once new slots arrive it will > make use of it. > b) Reactive + streaming job that uses AdaptiveScheduler: Can run with fewer > slots than requested. Once new slots arrive it will make use of it. > c) Active + batch job that uses batch adaptive scheduler + any of 1., 2. or > 3.: Can run with fewer slots than requested (because it can complete the > job with a single slot). Can ask for more slots. Once new slots arrive it > will make use of it. > b) Standalone + batch job that uses batch adaptive scheduler + any of 1., > 2. or 3.: Can run with fewer slots than requested (because it can complete > the job with a single slot). Once new slots arrive it will make use of it > (up to the desired maximum parallelism). > > If we decide to go with option 1. or 2., then we will only be able to run > mixed workloads (mixture of bounded and unbounded sources) in streaming > mode. This might be ok for the time being. > > This actually leads to my main concern, which is to give our users > consistent and somewhat easy to understand deployment options. In order to > achieve this Flink should always be able to make progress unless the > parallelism is explicitly configured (e.g. a very high parallelism in a > pipelined region that cannot be fulfilled). Moreover, Flink should be able > to make use of new resources if the job isn't being run at the maximum > parallelism already. Removing slots so that the minimum number of required > slots is still available should also be possible. Maybe one idea could be > to make the adaptive batch scheduler the default for batch jobs eventually. > For streaming jobs, we would ideally always use the AdaptiveScheduler > to give a consistent behaviour. > > As a side note: Creating as many subpartitions as the maximum parallelism > is will result in a one-to-one mapping between sub partitions and key > groups. If we then also make the non keyed operators work on a set of sub > partitions that store the operator state, then the sub partitions could be > seen as some logical work unit/split that is assigned to operators. Having > such an abstraction could allow us to track which work unit has completed > which helps with rescaling of operators and maintaining order guarantees, > for example. > > I also left some smaller comments in the wiki. > > Cheers, > Till > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com> > wrote: > > > Hi all, > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to Flink: adaptive > > batch job scheduler. The new scheduler can automatically decide > > parallelisms of job vertices for batch jobs, according to the size of > data > > volume each vertex needs to process. > > > > Major benefits of this scheduler includes: > > > > 1. Batch job users can be relieved from parallelism tuning > > 2. Automatically tuned parallelisms can be vertex level and can better > > fit consumed datasets which have a varying volume size every day > > > > > > 1. Vertices from SQL batch jobs can be assigned with different > > parallelisms which are automatically tuned > > 2. It can be the first step towards enabling auto-rebalancing > workloads > > of tasks > > > > You can find more details in the FLIP-187[1]. Looking forward to your > > feedback. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler > > > > Best, > > > > Lijie > > >