@Dong: Looking at the busyTime metrics in the TaskOMetricGroup it seems that busy time is actually defined as "not idle or (soft) backpressured" . So I think it would give us the correct reading based on what you said about the Kafka sink.
In any case we have to test this and if something is not as we expect we should fix the metric. Gyula On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote: > Thanks for the explanation Gyula. Please see my reply inline. > > BTW, has the proposed solution been deployed and evaluated with any > production workload? If yes, I am wondering if you could share the > experience, e.g. what is the likelihood of having regression and > improvement respectively after enabling this feature. > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi Dong! >> >> Let me try to answer the questions :) >> >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time >> spent in the main record processing loop for an operator if I >> understand correctly. This includes IO operations too. >> > > I took a look at the StreamTask::processInput(...) > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576>. > My understanding of this code is that when KafkaSink (same for other sinks) > can not write data out to the network fast enough, recordWriter > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575> > becomes unavailable and the StreamTask is considered to be > soft-back-pressured, instead of being considered busy. > > If this is the case, then the algorithm currently proposed in the FLIP > might under-provision the sink operator's parallelism when the sink > operator is the bottleneck. I am not an expert with this piece of code > though. I am wondering if you or someone else could double-check this. > > >> 2: We should add this to the FLIP I agree. It would be a Duration config >> with the expected catch up time after rescaling (let's say 5 minutes). It >> could be computed based on the current data rate and the calculated max >> processing rate after the rescale. >> >> Great. I am looking forward to the formula :) > > >> 3: In the current proposal we don't have per operator configs. Target >> utilization would apply to all operators uniformly. >> >> 4: It should be configurable, yes. >> >> 5,6: The names haven't been finalized but I think these are minor >> details. We could add concrete names to the FLIP :) >> >> Sounds good. > > >> Cheers, >> Gyula >> >> >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote: >> >>> Hi Max, >>> >>> Thank you for the proposal. The proposal tackles a very important issue >>> for Flink users and the design looks promising overall! >>> >>> I have some questions to better understand the proposed public >>> interfaces and the algorithm. >>> >>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond >>> could reach 1 sec. I believe this is mostly true for cpu-bound operators. >>> Could you confirm that this can also be true for io-bound operators such as >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck >>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond >>> reach 1 sec? >>> >>> 2) It is said that "users can configure a maximum time to fully process >>> the backlog". The configuration section does not seem to provide this >>> config. Could you specify this? And any chance this proposal can provide >>> the formula for calculating the new processing rate? >>> >>> 3) How are users expected to specify the per-operator configs (e.g. >>> target utilization)? For example, should users specify it programmatically >>> in a DataStream/Table/SQL API? >>> >>> 4) How often will the Flink Kubernetes operator query metrics from >>> JobManager? Is this configurable? >>> >>> 5) Could you specify the config name and default value for the proposed >>> configs? >>> >>> 6) Could you add the name/mbean/type for the proposed metrics? >>> >>> >>> Cheers, >>> Dong >>> >>> >>>