Thanks, Arvid! +1 for SinkWriterMetricGroup. Sink is a little more tricky, because it can have local committer (running on TM) or global committer (running on JM). In the future, it is possible to add SinkCommitterMetricGroup or SinkGlobalCommitterMetricGroup.
Regarding "lastFetchTime" latency metric, I found Gauge to be less informative as it only captures the last sampling value for each metric publish interval (e.g. 60s). * Can we make it a histogram? Histograms are more expensive though. * Timer [1, 2] is cheaper as it just tracks min, max, avg, count. but there is no such metric type in Flink * Summary metric type [3] (from Prometheus) would be nice too [1] https://netflix.github.io/spectator/en/latest/intro/timer/#timer [2] https://docs.spring.io/spring-metrics/docs/current/public/prometheus#timers [3] https://prometheus.io/docs/concepts/metric_types/#summary On Mon, Jul 19, 2021 at 12:22 AM Arvid Heise <ar...@apache.org> wrote: > Hi Steven, > > I extended the FLIP and its draft PR to have a SourceReaderMetricGroup and > a SplitEnumeratorMetricGroup. I hope that it makes it clearer. > I'd like to address FLINK-21000 as part of the implementation but I'd keep > it out of the FLIP discussion. > > Question: should we rename SinkMetricGroup to SinkWriterMetricGroup? I can > see the same confusion arising on sink side. I have added a commit to the > draft PR (not updated FLIP yet). > > Btw I'd like to start the vote soonish. @Becket Qin <becket....@gmail.com> > are you okay with the setLastFetchTimeGauge explanation or do you have > alternative ideas? > > Best, > > Arvid > > On Fri, Jul 16, 2021 at 8:13 PM Steven Wu <stevenz...@gmail.com> wrote: > > > To avoid confusion, can we either rename "SourceMetricGroup" to " > > SplitReaderMetricGroup" or add "Reader" to the setter method names? > > > > Yes, we should add the "unassigned/pending splits" enumerator metric. I > > tried to publish those metrics for IcebergSourceEnumerator and ran into > an > > issue [1]. I don't want to distract the discussion with the jira ticket. > > > > [1] https://issues.apache.org/jira/browse/FLINK-21000 > > > > On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <ar...@apache.org> wrote: > > > > > Hi Steven, > > > > > > The semantics are unchanged compared to FLIP-33 [1] but I see your > point. > > > > > > In reality, pending records would be mostly for event storage systems > > > (Kafka, Kinesis, ...). Here, we would report the consumer lag > > effectively. > > > If consumer lag is more prominent, we could also rename it. > > > > > > For pending bytes, this is mostly related to file source or any kind of > > > byte streams. At this point, we can only capture the assigned splits on > > > reader levels. I don't think it makes sense to add the same metric to > the > > > enumerator as that might induce too much I/O on the job master. I could > > > rather envision another metric that captures how many unassigned splits > > > there are. In general, I think it would be a good idea to add another > > type > > > of top-level metric group for SplitEnumerator called > > > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could > add > > > unassigned/pending splits metric. WDYT? > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <stevenz...@gmail.com> > wrote: > > > > > > > I am trying to understand what those two metrics really capture > > > > > > > > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge); > > > > > > > > - use file source as an example, it captures the remaining bytes > > for > > > > the current file split that the reader is processing? How would > > users > > > > interpret or use this metric? enumerator keeps tracks of the > > > > pending/unassigned splits, which is an indication of the size of > the > > > > backlog. that would be very useful > > > > > > > > > > > > <G extends Gauge<Long>> G setPendingRecordsGauge(G > > pendingRecordsGauge); > > > > > > > > - In the Kafka source case, this is intended to capture the > consumer > > > lag > > > > (log head offset from broker - current record offset)? that could > be > > > > used > > > > to capture the size of the backlog > > > > > > > > > > > > > > > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org> > wrote: > > > > > > > > > Hi Becket, > > > > > > > > > > I believe 1+2 has been answered by Chesnay already. Just to add to > 2: > > > I'm > > > > > not the biggest fan of reusing task metrics but that's what FLIP-33 > > and > > > > > different folks suggested. I'd probably keep task I/O metrics only > > for > > > > > internal things and add a new metric for external calls. Then, we > > could > > > > > even allow users to track I/O in AsyncIO (which would currently be > a > > > > mess). > > > > > However, with the current abstraction, it would be relatively easy > to > > > add > > > > > separate metrics later. > > > > > > > > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to > the > > > > user > > > > > to implement it in a way that fetch time always corresponds to the > > > latest > > > > > polled record. For SourceReaderBase, I have added a new > > > > > RecordsWithSplitIds#lastFetchTime (with default return value null) > > that > > > > > sets the last fetch time automatically whenever the next batch is > > > > selected. > > > > > Tbh this metric is a bit more challenging to implement for > > > > > non-SourceReaderBase sources but I have not found a better, > > thread-safe > > > > > way. Of course, we could shift the complete calculation into > > user-land > > > > but > > > > > I'm not sure that this is easier. > > > > > For your scenarios: > > > > > - in A, you assume SourceReaderBase. In that case, we could eagerly > > > > report > > > > > the metric as sketched by you. It depends on the definition of > "last > > > > > processed record" in FLIP-33, whether this eager reporting is more > > > > correct > > > > > than the lazy reporting that I have proposed. The former case > assumes > > > > "last > > > > > processed record" = last fetched record, while the latter case > > assumes > > > > > "last processed record" = "last polled record". For the proposed > > > > solution, > > > > > the user would just need to implement > > > RecordsWithSplitIds#lastFetchTime, > > > > > which typically corresponds to the creation time of the > > > > RecordsWithSplitIds > > > > > instance. > > > > > - B is not assuming SourceReaderBase. > > > > > If it's SourceReaderBase, the same proposed solution works out of > the > > > > box: > > > > > SourceOperator intercepts the emitted event time and uses the fetch > > > time > > > > of > > > > > the current batch. > > > > > If it's not SourceReaderBase, the user would need to attach the > > > timestamp > > > > > to the handover protocol if multi-threaded and set the > > > lastFetchTimeGauge > > > > > when a value in the handover protocol is selected (typically a > > batch). > > > > > If it's a single threaded source, the user could directly set the > > > current > > > > > timestamp after fetching the records in a sync fashion. > > > > > The bad case is if the user is fetching individual records (either > > sync > > > > or > > > > > async), then the fetch time would be updated with every record. > > > However, > > > > > I'm assuming that the required system call is dwarfed by involved > > I/O. > > > > > > > > > > [1] https://github.com/apache/flink/pull/15972 > > > > > > > > > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler < > > ches...@apache.org> > > > > > wrote: > > > > > > > > > > > Re 1: We don't expose the reuse* methods, because the proposed > > > > > > OperatorIOMetricGroup is a separate interface from the existing > > > > > > implementations (which will be renamed and implement the new > > > > interface). > > > > > > > > > > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out > > > counters > > > > > > for tasks ("new" because all we are doing is exposing already > > > existing > > > > > > metrics). We may however change this in the future if we want to > > > report > > > > > > the byte metrics on an operator level, which is primarily > > interesting > > > > > > for async IO or other external connectivity outside of > > sinks/sources. > > > > > > > > > > > > On 13/07/2021 12:38, Becket Qin wrote: > > > > > > > Hi Arvid, > > > > > > > > > > > > > > Thanks for the proposal. I like the idea of exposing concrete > > > metric > > > > > > group > > > > > > > class so that users can access the predefined metrics. > > > > > > > > > > > > > > A few questions are following: > > > > > > > > > > > > > > 1. When exposing the OperatorIOMetrics to the users, we are > also > > > > > exposing > > > > > > > the reuseInputMetricsForTask to the users. Should we hide these > > two > > > > > > methods > > > > > > > because users won't have enough information to decide whether > the > > > > > records > > > > > > > IO metrics should be reused by the task or not. > > > > > > > > > > > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are > > > adding > > > > > > > numBytesInCounter and numBytesOutCounter. Should these metrics > be > > > > > reusing > > > > > > > the task level metrics by default? > > > > > > > > > > > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am > not > > > sure > > > > > how > > > > > > > it works with the FetchLag. Typically there are two cases when > > > > > reporting > > > > > > > the fetch lag. > > > > > > > A. The EventTime is known at the point when the record is > > > > fetched > > > > > in > > > > > > > the SplitFetcher, so the fetch lag can be derived and reported > > > > > > immediately. > > > > > > > B. The EventTime is known only after the fetched record > was > > > > parsed > > > > > > in > > > > > > > the RecordEmitter. In this case, the RecordEmitter needs to get > > the > > > > > fetch > > > > > > > time of that particular record. > > > > > > > I am not sure when users set the LastFetchTime in the above two > > > > cases. > > > > > > Can > > > > > > > you help elaborate on how users should use it? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org> > > > > wrote: > > > > > > > > > > > > > >> Dear devs, > > > > > > >> > > > > > > >> As a continuation and generalization of FLIP-33 (Standardize > > > > Connector > > > > > > >> Metrics) [1], we'd like to discuss how we actually expose the > > > > > > standardized > > > > > > >> operator metrics to users in terms of changes to the API. > > > > > > >> > > > > > > >> Please check out the FLIP [2] and provide feedback. > > > > > > >> > > > > > > >> Best, > > > > > > >> > > > > > > >> Arvid > > > > > > >> > > > > > > >> [1] > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > >> [2] > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >