Thanks for the explanation Gyula. Please see my reply inline. BTW, has the proposed solution been deployed and evaluated with any production workload? If yes, I am wondering if you could share the experience, e.g. what is the likelihood of having regression and improvement respectively after enabling this feature.
On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Dong! > > Let me try to answer the questions :) > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time > spent in the main record processing loop for an operator if I > understand correctly. This includes IO operations too. > I took a look at the StreamTask::processInput(...) <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576>. My understanding of this code is that when KafkaSink (same for other sinks) can not write data out to the network fast enough, recordWriter <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575> becomes unavailable and the StreamTask is considered to be soft-back-pressured, instead of being considered busy. If this is the case, then the algorithm currently proposed in the FLIP might under-provision the sink operator's parallelism when the sink operator is the bottleneck. I am not an expert with this piece of code though. I am wondering if you or someone else could double-check this. > 2: We should add this to the FLIP I agree. It would be a Duration config > with the expected catch up time after rescaling (let's say 5 minutes). It > could be computed based on the current data rate and the calculated max > processing rate after the rescale. > > Great. I am looking forward to the formula :) > 3: In the current proposal we don't have per operator configs. Target > utilization would apply to all operators uniformly. > > 4: It should be configurable, yes. > > 5,6: The names haven't been finalized but I think these are minor details. > We could add concrete names to the FLIP :) > > Sounds good. > Cheers, > Gyula > > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote: > >> Hi Max, >> >> Thank you for the proposal. The proposal tackles a very important issue >> for Flink users and the design looks promising overall! >> >> I have some questions to better understand the proposed public interfaces >> and the algorithm. >> >> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond >> could reach 1 sec. I believe this is mostly true for cpu-bound operators. >> Could you confirm that this can also be true for io-bound operators such as >> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck >> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond >> reach 1 sec? >> >> 2) It is said that "users can configure a maximum time to fully process >> the backlog". The configuration section does not seem to provide this >> config. Could you specify this? And any chance this proposal can provide >> the formula for calculating the new processing rate? >> >> 3) How are users expected to specify the per-operator configs (e.g. >> target utilization)? For example, should users specify it programmatically >> in a DataStream/Table/SQL API? >> >> 4) How often will the Flink Kubernetes operator query metrics from >> JobManager? Is this configurable? >> >> 5) Could you specify the config name and default value for the proposed >> configs? >> >> 6) Could you add the name/mbean/type for the proposed metrics? >> >> >> Cheers, >> Dong >> >> >>