Hi Jialiang, Thanks for the FLIP! Here're some thoughts of mine.
- For currentFetchEventTimeLag: The problem of currentFetchEventTimeLag is: FetchTime is determined in SplitReader driven by SplitFetcher thread, while EventTime is calculated at the output of SourceOperator driven by task's main thread [1], and there's a barrier (the elementQueue) between, so it's hard to calculate FetchTime - EventTime accurately against two threads. I assume the new method "recordFetched()" in SourceReaderMetricGroup can only be invoked in SplitReader when records are being fetched from an external system, and this will introduce concurrency issues as the event time is determined in a different thread. One possible solution in my mind is that records carry their own FetchTime all the way until they reach SourceOutput and their event time is extracted by TimestampAssigner, then we can calculate the accurate FetchTime - EventTime. This requires some changes in the SourceReaderBase API. - For currentProcessingTime: The name is a bit confusing to me, as it is quite similar to the "processing time" concept in stream processing [2]. Also I have some concerns about this new metric: I think it can be derived directly by two existing metrics (currentEmitEventTimeLag - currentFetchEventTimeLag), so is it necessary to introduce a new one? It should be very easy to perform this subtraction in an external metric monitoring system. Best, Qingsheng [1] https://github.com/apache/flink/blob/b1544e4e513d2b75b350c20dbb1c17a8232c22fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java#L107 [2] https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#notions-of-time-event-time-and-processing-time On Fri, Apr 26, 2024 at 8:05 PM jialiang tan <tanjialiang1...@gmail.com> wrote: > Thanks and glad I saw your reply. > > > I could not find the precise comment in the thread you are referring to > unfortunately. Do you have some specific point in the discussion in mind? > > Sorry for the misdirection, are you worried about the time zone and the NTP > problem? > Here are some of my personal insights, someone please correct me if I'm > wrong. > > Regarding the NTP, I have consulted with an ops colleague in my company, > this is guaranteed by the sysadmin, usually developers do not need to > consider this problem. > Regarding the time zone issue, I think consuming data across time zones > shouldn't be happen (e.g. data produced in Singapore and consumed by Flink > in China), perhaps as you said, providing this should come with disclaimers > to the user. > > If we have to consider the above issues, I think it is hard for us to > implement metrics like `currentEmitEventTimeLag`, > `currentFetchEventTimeLag`. > Assuming we ignore the NTP and timezone issues, then > `currentFetchEventTimeLag` and `currentEmitEventTimeLag` are actually a > pretty good reflection of the current consumption latency from the external > db/mq. > > > However, why reflection? > > Thanks for the doubt, good question! Yes, at first it was just a > workaround. But I found that "There is no connector (yet) available for > Flink version 1.19" in flink-connector-kafka document[1]. And I kept > thinking about it. It might be better to add support for this feature after > upgrading the flink version of flink-connector-kafka. WDYT? > > > Is this some pseudo-code or an actual implementation? This should invoke > something like `System.nanoTime()` and not `System.currentTimeMillis()` > because of precision/accuracy reasons [1] > > This is the actual implementation and it has been running well in my > company for months. And currently `currentEmitEventTimeLag` uses > `System.currentTimeMillis()`, and `currentFetchEventTimeLag` should match > it in my opinion. > > Best, > TanJiaLiang. > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#dependencies > > > <lorenzo.affe...@ververica.com.invalid> 于2024年4月26日周五 15:50写道: > > > Sorry for some mal-informed questions, and thanks for adding context here > > and to the FLIP. > > > > About 1: > > > I think this has been discussed in the FLIP-33 lists thread[2]. > > > > I could not find the precise comment in the thread you are referring to > > unfortunately. Do you have some specific point in the discussion in mind? > > However I understand that those metrics already exist and you are making > > them available now :) > > > > About 2: > > Good :) > > > > About 3: > > Totally agree with you, I like this approach, looks very consistent. > > > > About 4: > > Yeah, it is already there :) > > > > About 5: > > Thank you for providing the example, now it is clearer. > > However, why reflection? Is this only a workaround to make the current > > Kafka connector invoke `recordFetched` with newer version of Flink > without > > bumping the Flink version? Because, when you bump the Flink version the > > method will be exposed at compile time by `SourceReaderMetricGroup`. Can > > you comment on this? > > > > About 6: > > Got it, thank you. > > > > As last comment, in the implementation I can see: > > lastFetchTime = clock.absoluteTimeMillis(); > > > > Is this some pseudo-code or an actual implementation? > > This should invoke something like `System.nanoTime()` and not > > `System.currentTimeMillis()` because of precision/accuracy reasons [1] > > > > [1] > > > https://stackoverflow.com/questions/351565/system-currenttimemillis-vs-system-nanotime > > On Apr 24, 2024 at 14:00 +0200, jialiang tan <tanjialiang1...@gmail.com > >, > > wrote: > > > Hi lorenzo, thanks for your feedback! > > > > > > > > > > There can be major discrepancies between the absolute time got by the > > > > TaskManagers (if clocks are not synchronized via ntp for example), > and > > the > > > > results of the metric might be quite distant for different TMs. > > > > > > > > > I don't know much about this. can more experienced contributors help? > If > > it > > > exists, perhaps `currentEmitEventTimeLag` has the same problem? > > > > > > Furthermore, comparing the time on the Flink cluster with the event > time > > of > > > > records might introduce completely inaccurate results. > > > > > > > > > I think this has been discussed in the FLIP-33 lists thread[2]. > > > > > > 2 - I don't think the name `processingLag` represents the processing > time > > > > spent, I would rather see `processingTime` just for the semantics of > > the > > > > name itself. > > > > > > > > > +1, it makes sense to me. > > > > > > 3 - Do you really think the `processingTime` should be a gauge? I > > > > understand your justification for the fetch lag, but I think the > > processing > > > > time should be an histogram. For the inefficiency of this, how about > > some > > > > sampling (e.g.: only update the histogram 1 every 1000 events?) > > > > > > > > > I went back to the discussion of the FLIP-33 lists thread[2]. At first, > > > `emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, > and > > > they felt that histogram was too expensive. So they imported > > > `currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, and > put > > > `fetchEventTimeLag` and `emitEventTimeLag` in the future work. I think > we > > > need to create another FLIP to discuss it. Now I want to change > > > `processingTime` to `currentProcessingTime`, still using gauge, and > > remain > > > `processingTime` in the future work, WDYT? > > > > > > 4 - At this point, if we have the processing time and number of > records, > > we > > > > could also add throughput as a metric, so that the user would know > how > > many > > > > records/second the source is able to produce. > > > > > > > > > Do you mean `numRecordsInPerSecond` and `numRecordsOutPerSecond`? They > > were > > > defined in FLIP-33[1] and used in Flink-1.14. > > > > > > 5 - For the "Kafka Connector" section: can this be generalized for > > > > connectors in general? Can you provide an example to better > understand > > your > > > > statement about reflection? > > > > > > > > > Good idea, I have extended my FLIP. > > > > > > 6 - Does this introduce any UI change for representing the metric? > > > > > > > > > I'm just adding some new metrics, users can get them from REST API or > > > collect them into Prometheus, and no change for Flink WEB UI. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > [2] https://lists.apache.org/thread/r47zrqto4k8tsc3xvfg392zblfx6dczl > > > > > > Best, > > > TanJiaLiang > > > > > > <lorenzo.affe...@ververica.com.invalid> 于2024年4月24日周三 15:02写道: > > > > > > > Hello jialiang tan, and thank you for your contribution! > > > > > > > > Here are my questions: > > > > > > > > 1 - I don't know if exposing this as metric to the user could create > > some > > > > harm. There can be major discrepancies between the absolute time got > > by the > > > > TaskManagers (if clocks are not synchronized via ntp for example), > and > > the > > > > results of the metric might be quite distant for different TMs. > > > > Furthermore, comparing the time on the Flink cluster with the event > > time of > > > > records might introduce completely inaccurate results. I think > > providing > > > > this should come with many disclaimers to the user. Maybe, more > > experienced > > > > contributors can comment on this as well. > > > > > > > > 2 - I don't think the name `processingLag` represents the processing > > time > > > > spent, I would rather see `processingTime` just for the semantics of > > the > > > > name itself. > > > > > > > > 3 - Do you really think the `processingTime` should be a gauge? I > > > > understand your justification for the fetch lag, but I think the > > processing > > > > time should be an histogram. For the inefficiency of this, how about > > some > > > > sampling (e.g.: only update the histogram 1 every 1000 events?) > > > > > > > > 4 - At this point, if we have the processing time and number of > > records, > > > > we could also add throughput as a metric, so that the user would know > > how > > > > many records/second the source is able to produce. > > > > > > > > 5 - For the "Kafka Connector" section: can this be generalized for > > > > connectors in general? Can you provide an example to better > understand > > your > > > > statement about reflection? > > > > > > > > 6 - Does this introduce any UI change for representing the metric? > > > > > > > > Thank you! > > > > On Apr 22, 2024 at 12:26 +0200, jialiang tan < > > tanjialiang1...@gmail.com>, > > > > wrote: > > > > > > Sorry all, it seems bad formatting in my email message, now I > send > > it > > > > again > > > > > > gently and hope it work. > > > > > > > > > > > > I would like to start a discussion about FLIP-XXX: > > > > > > SupportcurrentFetchEventTimeLag and processingLag metrics [1]. > > > > > > > > > > > > The main motivation for this change was that I had some > > difficulties > > > > > > inimplementing the currentFetchEventTimeLag metrics for > > KafkaSource [2]. > > > > > > > > > > > > So I proposed to let the SourceReaderMetricGroup provide an > > interface to > > > > > > capturethe FetchTime so that all the FLIP-27 [3] sources can > easily > > > > > > implement thecurrentFetchEventTimeLag metrics. > > > > > > > > > > > > In addition, I propose to support the processingLag metric for > the > > > > > > FLIP-27sources to measure the current processing latency of the > > source. > > > > > > > > > > > > See the FLIP [1] and Jira [2] for more details. > > > > > > > > > > > > Looking forward to your comments and opinions! > > > > > > > > > > > > Thanks, > > > > > > TanJiaLiang. > > > > > > > > > > > > [1] > > > > > > > > > > > > > https://docs.google.com/document/d/1nPhh1A-v-a7zyQyl1A5-K5DeUqbfxNXdjr2TVBT-QMs/edit?usp=sharing > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-33173 > > > > > > [3] > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > > > > > > > > > > > > > > > > > >