Hi Vishal, thanks a lot for all your feedback on the new reactive mode. I'll try to answer your questions.
0. In order to avoid confusion let me quickly explain a bit of terminology: The reactive mode is the new feature that allows Flink to react to newly available resources and to make use of them. In order to achieve this, it uses the newly introduce AdaptiveScheduler which works by declaring the required resources and adapting the job if it loses slots or receives slots if the job is running below its configured parallelism. The AdaptiveScheduler can also be used w/o the reactive mode which would give you the capability that Flink would be able to continue executing your job if your cluster loses TaskManagers (with the default scheduler, the job would then fail with not enough resources). The difference between the reactive and non-reactive mode is that in the reactive mode, Flink ignores the configured parallelism value and tries to run the job at the maxParallelism (so to say the maximum possible parallelism your job can be run with). 1. The AdaptiveScheduler and thereby also the reactive mode uses a simple slot distribution mechanism where every slot sharing group gets the same number of slots. The parallelism of an operator in this slot sharing group is then the minimum of the number of slots and the configured parallelism (when using the reactive mode it would be the configured maxParallelism). This is of course not ideal and can lead to unused slots. Moreover, it does not support scaling different slot sharing groups independently. This is a limitation of the current implementation. 2. With the reactive mode, external systems can control the parallelism with which a job is running by starting and stopping TaskManagers. Admittedly, this does not give you super fine-grained control over the running operators. Being able to specify ratios for operators could be a good extension. 3. Local recovery simply has not been implemented because of scoping reasons. There is nothing fundamentally preventing this from working, it just hasn't been implemented yet. 4. No, there are currently no separate metrics for restarts and rescaling operations. I do see the usefulness of such a metric. However, for the reactive mode where scaling down means that we kill a TaskManager, I am not entirely sure how we will be able to distinguish this from any other reason which can kill a TaskManager. The only way I could see this work is by telling Flink about the killing of a TaskManager. 5. No, Flink is not able to do this kind of operator-based optimizations at the moment. I think this could be possible once we have implemented the next step of the reactive mode which is proper auto-scaling. 6. From a high-level perspective, there is not much difference between the reactive mode and manually taking a savepoint and resuming the job from it, and changing the parallelism. That's effectively also what Flink does internally. The difference is that this operation is now automated and that Flink can handle also situations where you don't get all the resources after a restart where the manual approach would simply fail. 7. Setting the maxParallelism is only required for the reactive mode and if you don't want to run an operator with the default maxParallelism value. Per definition, the maxParallelism defines the maximum parallelism you can run your operator with. Hence, if you set this value to something, then you should be sure that you don't have to run your job with higher parallelism than that. Note, that the reactive mode will try to run the operator with this parallelism. However, if it has fewer resources, then it will run the operators at lower parallelism. So the maxParallelism defines the upper bound for the parallelism of your operator. The reactive mode's intention is the first step towards more elastic streaming pipelines and simplified operations. We see it as the foundation for more advanced features such as true auto-scaling where each operator can decide its parallelism. I hope this helps to understand the reactive mode a bit better. Cheers, Till On Wed, May 5, 2021 at 7:50 PM Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Vishal, > > WRT “bring down our internal services” - a common pattern with making > requests to external services is to measure latency, and throttle (delay) > requests in response to increased latency. > > You’ll see this discussed frequently on web crawling forums as an > auto-tuning approach. > > Typically there’s a steady increase in latency as load on the service > increases. > > The trick is throttling soon enough before you hit the “elbow” where a > service effectively falls over. > > — Ken > > > > On May 5, 2021, at 9:08 AM, vishalovercome <vis...@moengage.com> wrote: > > Yes. While back-pressure would eventually ensure high throughput, hand > tuning > parallelism became necessary because the job with high source parallelism > would immediately bring down our internal services - not giving enough time > to flink to adjust the in-rate. Plus running all operators at such a high > scale would result in wastage of resources, even with operator chaining in > place. > > That's why I think more toggles are needed to make current auto-scaling > truly shine. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > >