Sorry for some mal-informed questions, and thanks for adding context here and 
to the FLIP.

About 1:
> I think this has been discussed in the FLIP-33 lists thread[2].

I could not find the precise comment in the thread you are referring to 
unfortunately. Do you have some specific point in the discussion in mind?
However I understand that those metrics already exist and you are making them 
available now :)

About 2:
Good :)

About 3:
Totally agree with you, I like this approach, looks very consistent.

About 4:
Yeah, it is already there :)

About 5:
Thank you for providing the example, now it is clearer.
However, why reflection? Is this only a workaround to make the current Kafka 
connector invoke `recordFetched` with newer version of Flink without bumping 
the Flink version? Because, when you bump the Flink version the method will be 
exposed at compile time by `SourceReaderMetricGroup`. Can you comment on this?

About 6:
Got it, thank you.

As last comment, in the implementation I can see:
       lastFetchTime = clock.absoluteTimeMillis();

Is this some pseudo-code or an actual implementation?
This should invoke something like `System.nanoTime()` and not 
`System.currentTimeMillis()` because of precision/accuracy reasons [1]

[1] 
https://stackoverflow.com/questions/351565/system-currenttimemillis-vs-system-nanotime
On Apr 24, 2024 at 14:00 +0200, jialiang tan <tanjialiang1...@gmail.com>, wrote:
> Hi lorenzo, thanks for your feedback!
>
>
> > There can be major discrepancies between the absolute time got by the
> > TaskManagers (if clocks are not synchronized via ntp for example), and the
> > results of the metric might be quite distant for different TMs.
>
>
> I don't know much about this. can more experienced contributors help? If it
> exists, perhaps `currentEmitEventTimeLag` has the same problem?
>
> Furthermore, comparing the time on the Flink cluster with the event time of
> > records might introduce completely inaccurate results.
>
>
> I think this has been discussed in the FLIP-33 lists thread[2].
>
> 2 - I don't think the name `processingLag` represents the processing time
> > spent, I would rather see `processingTime` just for the semantics of the
> > name itself.
>
>
> +1, it makes sense to me.
>
> 3 - Do you really think the `processingTime` should be a gauge? I
> > understand your justification for the fetch lag, but I think the processing
> > time should be an histogram. For the inefficiency of this, how about some
> > sampling (e.g.: only update the histogram 1 every 1000 events?)
>
>
> I went back to the discussion of the FLIP-33 lists thread[2]. At first,
> `emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, and
> they felt that histogram was too expensive. So they imported
> `currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, and put
> `fetchEventTimeLag` and `emitEventTimeLag` in the future work. I think we
> need to create another FLIP to discuss it. Now I want to change
> `processingTime` to `currentProcessingTime`, still using gauge, and remain
> `processingTime` in the future work, WDYT?
>
> 4 - At this point, if we have the processing time and number of records, we
> > could also add throughput as a metric, so that the user would know how many
> > records/second the source is able to produce.
>
>
> Do you mean `numRecordsInPerSecond` and `numRecordsOutPerSecond`? They were
> defined in FLIP-33[1] and used in Flink-1.14.
>
> 5 - For the "Kafka Connector" section: can this be generalized for
> > connectors in general? Can you provide an example to better understand your
> > statement about reflection?
>
>
> Good idea, I have extended my FLIP.
>
> 6 - Does this introduce any UI change for representing the metric?
>
>
> I'm just adding some new metrics, users can get them from REST API or
> collect them into Prometheus, and no change for Flink WEB UI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> [2] https://lists.apache.org/thread/r47zrqto4k8tsc3xvfg392zblfx6dczl
>
> Best,
> TanJiaLiang
>
> <lorenzo.affe...@ververica.com.invalid> 于2024年4月24日周三 15:02写道:
>
> > Hello jialiang tan, and thank you for your contribution!
> >
> > Here are my questions:
> >
> > 1 - I don't know if exposing this as metric to the user could create some
> > harm. There can be major discrepancies between the absolute time got by the
> > TaskManagers (if clocks are not synchronized via ntp for example), and the
> > results of the metric might be quite distant for different TMs.
> > Furthermore, comparing the time on the Flink cluster with the event time of
> > records might introduce completely inaccurate results. I think providing
> > this should come with many disclaimers to the user. Maybe, more experienced
> > contributors can comment on this as well.
> >
> > 2 - I don't think the name `processingLag` represents the processing time
> > spent, I would rather see `processingTime` just for the semantics of the
> > name itself.
> >
> > 3 - Do you really think the `processingTime` should be a gauge? I
> > understand your justification for the fetch lag, but I think the processing
> > time should be an histogram. For the inefficiency of this, how about some
> > sampling (e.g.: only update the histogram 1 every 1000 events?)
> >
> > 4 - At this point, if we have the processing time and number of records,
> > we could also add throughput as a metric, so that the user would know how
> > many records/second the source is able to produce.
> >
> > 5 - For the "Kafka Connector" section: can this be generalized for
> > connectors in general? Can you provide an example to better understand your
> > statement about reflection?
> >
> > 6 - Does this introduce any UI change for representing the metric?
> >
> > Thank you!
> > On Apr 22, 2024 at 12:26 +0200, jialiang tan <tanjialiang1...@gmail.com>,
> > wrote:
> > > > Sorry all, it seems bad formatting in my email message, now I send it
> > again
> > > > gently and hope it work.
> > > >
> > > > I would like to start a discussion about FLIP-XXX:
> > > > SupportcurrentFetchEventTimeLag and processingLag metrics [1].
> > > >
> > > > The main motivation for this change was that I had some difficulties
> > > > inimplementing the currentFetchEventTimeLag metrics for KafkaSource [2].
> > > >
> > > > So I proposed to let the SourceReaderMetricGroup provide an interface to
> > > > capturethe FetchTime so that all the FLIP-27 [3] sources can easily
> > > > implement thecurrentFetchEventTimeLag metrics.
> > > >
> > > > In addition, I propose to support the processingLag metric for the
> > > > FLIP-27sources to measure the current processing latency of the source.
> > > >
> > > > See the FLIP [1] and Jira [2] for more details.
> > > >
> > > > Looking forward to your comments and opinions!
> > > >
> > > > Thanks,
> > > > TanJiaLiang.
> > > >
> > > > [1]
> > > >
> > https://docs.google.com/document/d/1nPhh1A-v-a7zyQyl1A5-K5DeUqbfxNXdjr2TVBT-QMs/edit?usp=sharing
> > > > [2] https://issues.apache.org/jira/browse/FLINK-33173
> > > > [3]
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > >
> > > > > >
> >

Reply via email to