Thanks for the explanation Gyula. Please see my reply inline.

BTW, has the proposed solution been deployed and evaluated with any
production workload? If yes, I am wondering if you could share the
experience, e.g. what is the likelihood of having regression and
improvement respectively after enabling this feature.


On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> spent in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>

I took a look at the StreamTask::processInput(...)
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576>.
My understanding of this code is that when KafkaSink (same for other sinks)
can not write data out to the network fast enough, recordWriter
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575>
becomes unavailable and the StreamTask is considered to be
soft-back-pressured, instead of being considered busy.

If this is the case, then the algorithm currently proposed in the FLIP
might under-provision the sink operator's parallelism when the sink
operator is the bottleneck. I am not an expert with this piece of code
though. I am wondering if you or someone else could double-check this.


> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>
> Great. I am looking forward to the formula :)


> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>
> Sounds good.


> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote:
>
>> Hi Max,
>>
>> Thank you for the proposal. The proposal tackles a very important issue
>> for Flink users and the design looks promising overall!
>>
>> I have some questions to better understand the proposed public interfaces
>> and the algorithm.
>>
>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
>> Could you confirm that this can also be true for io-bound operators such as
>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
>> reach 1 sec?
>>
>> 2) It is said that "users can configure a maximum time to fully process
>> the backlog". The configuration section does not seem to provide this
>> config. Could you specify this? And any chance this proposal can provide
>> the formula for calculating the new processing rate?
>>
>> 3) How are users expected to specify the per-operator configs (e.g.
>> target utilization)? For example, should users specify it programmatically
>> in a DataStream/Table/SQL API?
>>
>> 4) How often will the Flink Kubernetes operator query metrics from
>> JobManager? Is this configurable?
>>
>> 5) Could you specify the config name and default value for the proposed
>> configs?
>>
>> 6) Could you add the name/mbean/type for the proposed metrics?
>>
>>
>> Cheers,
>> Dong
>>
>>
>>

Reply via email to