Sorry for some mal-informed questions, and thanks for adding context here and to the FLIP.
About 1: > I think this has been discussed in the FLIP-33 lists thread[2]. I could not find the precise comment in the thread you are referring to unfortunately. Do you have some specific point in the discussion in mind? However I understand that those metrics already exist and you are making them available now :) About 2: Good :) About 3: Totally agree with you, I like this approach, looks very consistent. About 4: Yeah, it is already there :) About 5: Thank you for providing the example, now it is clearer. However, why reflection? Is this only a workaround to make the current Kafka connector invoke `recordFetched` with newer version of Flink without bumping the Flink version? Because, when you bump the Flink version the method will be exposed at compile time by `SourceReaderMetricGroup`. Can you comment on this? About 6: Got it, thank you. As last comment, in the implementation I can see: lastFetchTime = clock.absoluteTimeMillis(); Is this some pseudo-code or an actual implementation? This should invoke something like `System.nanoTime()` and not `System.currentTimeMillis()` because of precision/accuracy reasons [1] [1] https://stackoverflow.com/questions/351565/system-currenttimemillis-vs-system-nanotime On Apr 24, 2024 at 14:00 +0200, jialiang tan <tanjialiang1...@gmail.com>, wrote: > Hi lorenzo, thanks for your feedback! > > > > There can be major discrepancies between the absolute time got by the > > TaskManagers (if clocks are not synchronized via ntp for example), and the > > results of the metric might be quite distant for different TMs. > > > I don't know much about this. can more experienced contributors help? If it > exists, perhaps `currentEmitEventTimeLag` has the same problem? > > Furthermore, comparing the time on the Flink cluster with the event time of > > records might introduce completely inaccurate results. > > > I think this has been discussed in the FLIP-33 lists thread[2]. > > 2 - I don't think the name `processingLag` represents the processing time > > spent, I would rather see `processingTime` just for the semantics of the > > name itself. > > > +1, it makes sense to me. > > 3 - Do you really think the `processingTime` should be a gauge? I > > understand your justification for the fetch lag, but I think the processing > > time should be an histogram. For the inefficiency of this, how about some > > sampling (e.g.: only update the histogram 1 every 1000 events?) > > > I went back to the discussion of the FLIP-33 lists thread[2]. At first, > `emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, and > they felt that histogram was too expensive. So they imported > `currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, and put > `fetchEventTimeLag` and `emitEventTimeLag` in the future work. I think we > need to create another FLIP to discuss it. Now I want to change > `processingTime` to `currentProcessingTime`, still using gauge, and remain > `processingTime` in the future work, WDYT? > > 4 - At this point, if we have the processing time and number of records, we > > could also add throughput as a metric, so that the user would know how many > > records/second the source is able to produce. > > > Do you mean `numRecordsInPerSecond` and `numRecordsOutPerSecond`? They were > defined in FLIP-33[1] and used in Flink-1.14. > > 5 - For the "Kafka Connector" section: can this be generalized for > > connectors in general? Can you provide an example to better understand your > > statement about reflection? > > > Good idea, I have extended my FLIP. > > 6 - Does this introduce any UI change for representing the metric? > > > I'm just adding some new metrics, users can get them from REST API or > collect them into Prometheus, and no change for Flink WEB UI. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > [2] https://lists.apache.org/thread/r47zrqto4k8tsc3xvfg392zblfx6dczl > > Best, > TanJiaLiang > > <lorenzo.affe...@ververica.com.invalid> 于2024年4月24日周三 15:02写道: > > > Hello jialiang tan, and thank you for your contribution! > > > > Here are my questions: > > > > 1 - I don't know if exposing this as metric to the user could create some > > harm. There can be major discrepancies between the absolute time got by the > > TaskManagers (if clocks are not synchronized via ntp for example), and the > > results of the metric might be quite distant for different TMs. > > Furthermore, comparing the time on the Flink cluster with the event time of > > records might introduce completely inaccurate results. I think providing > > this should come with many disclaimers to the user. Maybe, more experienced > > contributors can comment on this as well. > > > > 2 - I don't think the name `processingLag` represents the processing time > > spent, I would rather see `processingTime` just for the semantics of the > > name itself. > > > > 3 - Do you really think the `processingTime` should be a gauge? I > > understand your justification for the fetch lag, but I think the processing > > time should be an histogram. For the inefficiency of this, how about some > > sampling (e.g.: only update the histogram 1 every 1000 events?) > > > > 4 - At this point, if we have the processing time and number of records, > > we could also add throughput as a metric, so that the user would know how > > many records/second the source is able to produce. > > > > 5 - For the "Kafka Connector" section: can this be generalized for > > connectors in general? Can you provide an example to better understand your > > statement about reflection? > > > > 6 - Does this introduce any UI change for representing the metric? > > > > Thank you! > > On Apr 22, 2024 at 12:26 +0200, jialiang tan <tanjialiang1...@gmail.com>, > > wrote: > > > > Sorry all, it seems bad formatting in my email message, now I send it > > again > > > > gently and hope it work. > > > > > > > > I would like to start a discussion about FLIP-XXX: > > > > SupportcurrentFetchEventTimeLag and processingLag metrics [1]. > > > > > > > > The main motivation for this change was that I had some difficulties > > > > inimplementing the currentFetchEventTimeLag metrics for KafkaSource [2]. > > > > > > > > So I proposed to let the SourceReaderMetricGroup provide an interface to > > > > capturethe FetchTime so that all the FLIP-27 [3] sources can easily > > > > implement thecurrentFetchEventTimeLag metrics. > > > > > > > > In addition, I propose to support the processingLag metric for the > > > > FLIP-27sources to measure the current processing latency of the source. > > > > > > > > See the FLIP [1] and Jira [2] for more details. > > > > > > > > Looking forward to your comments and opinions! > > > > > > > > Thanks, > > > > TanJiaLiang. > > > > > > > > [1] > > > > > > https://docs.google.com/document/d/1nPhh1A-v-a7zyQyl1A5-K5DeUqbfxNXdjr2TVBT-QMs/edit?usp=sharing > > > > [2] https://issues.apache.org/jira/browse/FLINK-33173 > > > > [3] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > > > > > > > > >