I have to admit that I cannot think of a better name for the adaptive batch scheduler atm. Maybe it is good enough to call the two schedulers AdaptiveBatchScheduler and AdaptiveStreamingScheduler to tell which scheduler is used for which execution mode. It is true, though, that the former is adaptive wrt the workload and the latter wrt the available resources.
Cheers, Till On Wed, Nov 3, 2021 at 9:08 AM Lijie Wang <wangdachui9...@gmail.com> wrote: > Hi David, > > > Thanks for your comments. > > > I personally think that "Adaptive" means: Flink automatically determines > the appropriate scheduling and execution plan based on some information. > The information can include both resource information and workload > information, rather than being limited to a certain dimension. > > > > For the adaptive scheduler, it is currently resource-based, but in the > future I think it can combine the workload (auto-scaling). > > > For the batch adaptive scheduler, it's currently workload-based, but as > Till & Zhu mentioned, our ultimate goal will also consider resources. > > Short summary, the "Adaptive" can include multiple dimensions. Currently, > both the adaptive scheduler and batch adaptive scheduler only focus on one > dimension, and we hope to combine more dimensions in order to give a better > scheduling and execution plan in the future. > > > At present, in order not to confuse users, it is necessary to indicate the > information different schedulers based on in the document. > > > Best, > > Lijie > > > > David Morávek <d...@apache.org> 于2021年11月2日周二 下午9:50写道: > > > Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks > > pretty solid and it will be a really great improvement to the batch > > scheduling. I'd second to the Till's feedback, especially when it comes > to > > the consistent behavior between different deployment types / schedulers. > > > > What I'm bit unsure about is the naming here. The word *Adaptive* means > > something different in the streaming and batch scheduler: > > - For *streaming* it refers to the ability to adapt the job parallelism > > based on the resource availability. > > - For *batch* it refers to the ability to adapt the stage parallelism > based > > on the output of the previous stage. > > > > Should this be a concern? > > > > Best, > > D. > > > > > > > > On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com> > > wrote: > > > > > Hi, Till & Zhu > > > > > > Thanks for your feedback. Also thanks for your comments and suggestions > > > > > > on wiki, which are very helpful for perfecting the FLIP. > > > > > > > > > I also agree to provide our users with consistent and > easy-to-understand > > > > > > deployment options. Regarding the three options proposed by Till, my > > > opinion > > > > > > is the same as Zhu's. In the first version, we can only support > > "option1", > > > and > > > then the > > > > > > "option2" and "option3" can be the future improvements. > > > > > > Regarding the side note to abstract subpartitions as splits, > > > > > > although it is not our original intention, I personally feel it's > > > meaningful. > > > > > > This is also helpful to users, users can use it to do some monitoring > > work, > > > > > > in order to get the progress of jobs in detail. > > > > > > Best, > > > > > > Lijie > > > > > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道: > > > > > > > Hi Till, > > > > > > > > Thanks for the comments! > > > > > > > > I agree with you that we should avoid an auto-scaled job not able to > be > > > > scheduled > > > > in standalone/reactive mode. And I think it's great if we can expose > a > > > > deployment > > > > option that is consistent for streaming and batch jobs, which can be > > > easier > > > > to > > > > understand. Just looking to the day to make both adaptive schedulers > > > > default, so > > > > that most users do not need to care about job tuning while the job > can > > > run > > > > well. > > > > > > > > Regarding the three options, personally I prefer to take *#1* as the > > > first > > > > step, to > > > > limit the scope of this FLIP a bit, otherwise it may be too > > complicated. > > > > I think *#3* is the final goal we need to target later, so that mixed > > > > bounded and > > > > unbounded workloads can be supported. Given that there can be > multiple > > > > stages scheduled at the same time, the design of the scheduling may > not > > > be > > > > very straightforward and needs some thorough consideration. > > > > *#2* can be a very good improvement itself. Shuffles of batch jobs > can > > be > > > > auto-determined to be pipelined or blocking according to available > > > > resources. > > > > But the changes may involve many components and can be large. So I > > think > > > > it can be a standalone future improvement. > > > > > > > > Regarding the side note to abstract subpartitions as splits, the idea > > is > > > > very > > > > interesting to me. Besides supporting auto scaling, I think trackable > > > > produced > > > > splits can also help in troubleshooting and give some insights for > > future > > > > improvements. Collecting data sizes for batch adaptive scheduler can > be > > > the > > > > first step and we can further consider the abstraction of it. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道: > > > > > > > > > Hi Lijie, > > > > > > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-) > > > > > > > > > > I like the idea of making the parallelism of operators of a bounded > > job > > > > > dependent on the data size. This makes the job adjust automatically > > > when > > > > > the data sources/sizes change. > > > > > > > > > > I can see this work well in combination with the active mode where > > > Flink > > > > > can ask for more resources. > > > > > > > > > > In the case of the standalone mode, I think it can lead to > situations > > > > where > > > > > one and the same job can be scheduled or not depending on the input > > > data. > > > > > The problem is pipelined regions that contain more than a single > > > operator > > > > > instance (e.g. pipelined shuffles). We already have this problem > when > > > > > submitting a batch job with too high parallelism onto a standalone > > > > cluster. > > > > > However, with the adaptive batch mode this problem might become a > bit > > > > more > > > > > present. So my question would be how can we solve this problem > > > > (potentially > > > > > in a follow up step). I could think of the following three > > alternatives > > > > > atm: > > > > > > > > > > 1. Only allow blocking data exchanges: This will limit the size of > a > > > > > pipelined region to a single operator instance. This has the > downside > > > > that > > > > > we no longer support pipelined execution of multiple operators > (other > > > > than > > > > > chained). Moreover, it requires the user to set all data exchanges > to > > > > > blocking which cannot be enforced atm. > > > > > 2. Introduce a new pipelined-blocking data exchange hybrid that > > > supports > > > > > pipelined data exchanges but can also spill to disk if there is no > > > > > consumer: This could allow to still make progress in case that one > > has > > > a > > > > > pipelined region which requires more slots than what we currently > > have. > > > > > 3. Decide on the actual parallelism of a pipelined region after > > having > > > > > received the slots that are declared based on the data size per > > > subtask. > > > > If > > > > > the pipelined region contains an all-to-all connection, then the > > > > > parallelism is how many slots we currently have. If not, then the > > > > > parallelism can be decided by the data volume: This would > effectively > > > > mean > > > > > to enable the existing AdaptiveScheduler to also run batch > workloads. > > > > > > > > > > With either of these options, I believe that we could provide a > > > somewhat > > > > > consistent behaviour across the different deployment and execution > > > modes > > > > > wrt to scaling: > > > > > > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run with > > > fewer > > > > > slots than requested. Can ask for more slots. Once new slots arrive > > it > > > > will > > > > > make use of it. > > > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run > with > > > > fewer > > > > > slots than requested. Once new slots arrive it will make use of it. > > > > > c) Active + batch job that uses batch adaptive scheduler + any of > 1., > > > 2. > > > > or > > > > > 3.: Can run with fewer slots than requested (because it can > complete > > > the > > > > > job with a single slot). Can ask for more slots. Once new slots > > arrive > > > it > > > > > will make use of it. > > > > > b) Standalone + batch job that uses batch adaptive scheduler + any > of > > > 1., > > > > > 2. or 3.: Can run with fewer slots than requested (because it can > > > > complete > > > > > the job with a single slot). Once new slots arrive it will make use > > of > > > it > > > > > (up to the desired maximum parallelism). > > > > > > > > > > If we decide to go with option 1. or 2., then we will only be able > to > > > run > > > > > mixed workloads (mixture of bounded and unbounded sources) in > > streaming > > > > > mode. This might be ok for the time being. > > > > > > > > > > This actually leads to my main concern, which is to give our users > > > > > consistent and somewhat easy to understand deployment options. In > > order > > > > to > > > > > achieve this Flink should always be able to make progress unless > the > > > > > parallelism is explicitly configured (e.g. a very high parallelism > > in a > > > > > pipelined region that cannot be fulfilled). Moreover, Flink should > be > > > > able > > > > > to make use of new resources if the job isn't being run at the > > maximum > > > > > parallelism already. Removing slots so that the minimum number of > > > > required > > > > > slots is still available should also be possible. Maybe one idea > > could > > > be > > > > > to make the adaptive batch scheduler the default for batch jobs > > > > eventually. > > > > > For streaming jobs, we would ideally always use the > AdaptiveScheduler > > > > > to give a consistent behaviour. > > > > > > > > > > As a side note: Creating as many subpartitions as the maximum > > > parallelism > > > > > is will result in a one-to-one mapping between sub partitions and > key > > > > > groups. If we then also make the non keyed operators work on a set > of > > > sub > > > > > partitions that store the operator state, then the sub partitions > > could > > > > be > > > > > seen as some logical work unit/split that is assigned to operators. > > > > Having > > > > > such an abstraction could allow us to track which work unit has > > > completed > > > > > which helps with rescaling of operators and maintaining order > > > guarantees, > > > > > for example. > > > > > > > > > > I also left some smaller comments in the wiki. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang < > wangdachui9...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to Flink: > > > > adaptive > > > > > > batch job scheduler. The new scheduler can automatically decide > > > > > > parallelisms of job vertices for batch jobs, according to the > size > > of > > > > > data > > > > > > volume each vertex needs to process. > > > > > > > > > > > > Major benefits of this scheduler includes: > > > > > > > > > > > > 1. Batch job users can be relieved from parallelism tuning > > > > > > 2. Automatically tuned parallelisms can be vertex level and > can > > > > better > > > > > > fit consumed datasets which have a varying volume size every > day > > > > > > > > > > > > > > > > > > 1. Vertices from SQL batch jobs can be assigned with different > > > > > > parallelisms which are automatically tuned > > > > > > 2. It can be the first step towards enabling auto-rebalancing > > > > > workloads > > > > > > of tasks > > > > > > > > > > > > You can find more details in the FLIP-187[1]. Looking forward to > > your > > > > > > feedback. > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler > > > > > > > > > > > > Best, > > > > > > > > > > > > Lijie > > > > > > > > > > > > > > > > > > > > >