Hi Vishal,

thanks a lot for all your feedback on the new reactive mode. I'll try to
answer your questions.

0. In order to avoid confusion let me quickly explain a bit of terminology:
The reactive mode is the new feature that allows Flink to react to newly
available resources and to make use of them. In order to achieve this, it
uses the newly introduce AdaptiveScheduler which works by declaring the
required resources and adapting the job if it loses slots or receives slots
if the job is running below its configured parallelism. The
AdaptiveScheduler can also be used w/o the reactive mode which would give
you the capability that Flink would be able to continue executing your job
if your cluster loses TaskManagers (with the default scheduler, the job
would then fail with not enough resources). The difference between the
reactive and non-reactive mode is that in the reactive mode, Flink ignores
the configured parallelism value and tries to run the job at the
maxParallelism (so to say the maximum possible parallelism your job can be
run with).

1. The AdaptiveScheduler and thereby also the reactive mode uses a simple
slot distribution mechanism where every slot sharing group gets the same
number of slots. The parallelism of an operator in this slot sharing group
is then the minimum of the number of slots and the configured parallelism
(when using the reactive mode it would be the configured maxParallelism).
This is of course not ideal and can lead to unused slots. Moreover, it does
not support scaling different slot sharing groups independently. This is a
limitation of the current implementation.

2. With the reactive mode, external systems can control the parallelism
with which a job is running by starting and stopping TaskManagers.
Admittedly, this does not give you super fine-grained control over the
running operators. Being able to specify ratios for operators could be a
good extension.

3. Local recovery simply has not been implemented because of scoping
reasons. There is nothing fundamentally preventing this from working, it
just hasn't been implemented yet.

4. No, there are currently no separate metrics for restarts and rescaling
operations. I do see the usefulness of such a metric. However, for the
reactive mode where scaling down means that we kill a TaskManager, I am not
entirely sure how we will be able to distinguish this from any other reason
which can kill a TaskManager. The only way I could see this work is by
telling Flink about the killing of a TaskManager.

5. No, Flink is not able to do this kind of operator-based optimizations at
the moment. I think this could be possible once we have implemented the
next step of the reactive mode which is proper auto-scaling.

6. From a high-level perspective, there is not much difference between the
reactive mode and manually taking a savepoint and resuming the job from it,
and changing the parallelism. That's effectively also what Flink does
internally. The difference is that this operation is now automated and that
Flink can handle also situations where you don't get all the resources
after a restart where the manual approach would simply fail.

7. Setting the maxParallelism is only required for the reactive mode and if
you don't want to run an operator with the default maxParallelism value.
Per definition, the maxParallelism defines the maximum parallelism you can
run your operator with. Hence, if you set this value to something, then you
should be sure that you don't have to run your job with higher parallelism
than that. Note, that the reactive mode will try to run the operator with
this parallelism. However, if it has fewer resources, then it will run the
operators at lower parallelism. So the maxParallelism defines the upper
bound for the parallelism of your operator.

The reactive mode's intention is the first step towards more elastic
streaming pipelines and simplified operations. We see it as the foundation
for more advanced features such as true auto-scaling where each operator
can decide its parallelism. I hope this helps to understand the reactive
mode a bit better.

Cheers,
Till

On Wed, May 5, 2021 at 7:50 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Vishal,
>
> WRT “bring down our internal services” - a common pattern with making
> requests to external services is to measure latency, and throttle (delay)
> requests in response to increased latency.
>
> You’ll see this discussed frequently on web crawling forums as an
> auto-tuning approach.
>
> Typically there’s a steady increase in latency as load on the service
> increases.
>
> The trick is throttling soon enough before you hit the “elbow” where a
> service effectively falls over.
>
> — Ken
>
>
>
> On May 5, 2021, at 9:08 AM, vishalovercome <vis...@moengage.com> wrote:
>
> Yes. While back-pressure would eventually ensure high throughput, hand
> tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adjust the in-rate. Plus running all operators at such a high
> scale would result in wastage of resources, even with operator chaining in
> place.
>
> That's why I think more toggles are needed to make current auto-scaling
> truly shine.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Reply via email to