Hi Arvid and Chesnay,

Thanks for the explanation.

As outlined in the JavaDoc and in the draft PR [1], it's up to the user to
> implement it in a way that fetch time always corresponds to the latest
> polled record.

In that case, do we still need the metric here? It seems we are creating a
"global variable" which users may potentially use. I am wondering how much
additional convenience it provides because it seems easy for people to
simply pass the fetch time by themselves if they have decided to not use
SourceReaderBase. Also, it looks like we do not have an API pattern that
lets users get the value of a metric and derive another metric. So I think
it is easier for people to understand if LastFetchTimeGauge() is just an
independent metric by itself, instead of being a part of the
eventTimeFetchLag computation.

I have added a new RecordsWithSplitIds#lastFetchTime (with default return
> value null) that sets the last fetch time automatically whenever the next
> batch is selected.

Would it make sense to have a more generic metadata type <T> associated
with the records batch? In some cases, it may be useful to allow the Source
implementation to carry some additional information of the batch to the
RecordEmitter. For example, the split info of the batch, the sender of the
batch etc. Because the RecordEmitter only takes one record at.a time,
currently such information needs to be put into each record, which may
involve a lot of wrapper object creation.

Thanks,

Jiangjie (Becket) Qin

On Mon, Jul 19, 2021 at 11:29 PM Steven Wu <stevenz...@gmail.com> wrote:

>
> Thanks, Arvid!
>
> +1 for SinkWriterMetricGroup. Sink is a little more tricky, because it can
> have local committer (running on TM) or global committer (running on JM).
> In the future, it is possible to add SinkCommitterMetricGroup or
> SinkGlobalCommitterMetricGroup.
>
> Regarding "lastFetchTime" latency metric, I found Gauge to be less
> informative as it only captures the last sampling value for each metric
> publish interval (e.g. 60s).
> * Can we make it a histogram? Histograms are more expensive though.
> * Timer [1, 2] is cheaper as it just tracks min, max, avg, count. but
> there is no such metric type in Flink
> * Summary metric type [3] (from Prometheus) would be nice too
>
> [1] https://netflix.github.io/spectator/en/latest/intro/timer/#timer
> [2]
> https://docs.spring.io/spring-metrics/docs/current/public/prometheus#timers
> [3] https://prometheus.io/docs/concepts/metric_types/#summary
>
>
> On Mon, Jul 19, 2021 at 12:22 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Steven,
>>
>> I extended the FLIP and its draft PR to have a SourceReaderMetricGroup and
>> a SplitEnumeratorMetricGroup. I hope that it makes it clearer.
>> I'd like to address FLINK-21000 as part of the implementation but I'd keep
>> it out of the FLIP discussion.
>>
>> Question: should we rename SinkMetricGroup to SinkWriterMetricGroup? I can
>> see the same confusion arising on sink side. I have added a commit to the
>> draft PR (not updated FLIP yet).
>>
>> Btw I'd like to start the vote soonish. @Becket Qin <becket....@gmail.com
>> >
>> are you okay with the setLastFetchTimeGauge explanation or do you have
>> alternative ideas?
>>
>> Best,
>>
>> Arvid
>>
>> On Fri, Jul 16, 2021 at 8:13 PM Steven Wu <stevenz...@gmail.com> wrote:
>>
>> > To avoid confusion, can we either rename "SourceMetricGroup" to "
>> > SplitReaderMetricGroup" or add "Reader" to the setter method names?
>> >
>> > Yes, we should  add the "unassigned/pending splits" enumerator metric. I
>> > tried to publish those metrics for IcebergSourceEnumerator and ran into
>> an
>> > issue [1]. I don't want to distract the discussion with the jira ticket.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-21000
>> >
>> > On Thu, Jul 15, 2021 at 1:01 PM Arvid Heise <ar...@apache.org> wrote:
>> >
>> > > Hi Steven,
>> > >
>> > > The semantics are unchanged compared to FLIP-33 [1] but I see your
>> point.
>> > >
>> > > In reality, pending records would be mostly for event storage systems
>> > > (Kafka, Kinesis, ...). Here, we would report the consumer lag
>> > effectively.
>> > > If consumer lag is more prominent, we could also rename it.
>> > >
>> > > For pending bytes, this is mostly related to file source or any kind
>> of
>> > > byte streams. At this point, we can only capture the assigned splits
>> on
>> > > reader levels. I don't think it makes sense to add the same metric to
>> the
>> > > enumerator as that might induce too much I/O on the job master. I
>> could
>> > > rather envision another metric that captures how many unassigned
>> splits
>> > > there are. In general, I think it would be a good idea to add another
>> > type
>> > > of top-level metric group for SplitEnumerator called
>> > > SplitEnumeratorMetricGroup in SplitEnumeratorContext. There we could
>> add
>> > > unassigned/pending splits metric. WDYT?
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > >
>> > > On Wed, Jul 14, 2021 at 9:00 AM Steven Wu <stevenz...@gmail.com>
>> wrote:
>> > >
>> > > > I am trying to understand what those two metrics really capture
>> > > >
>> > > > <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge);
>> > > >
>> > > >    -  use file source as an example, it captures the remaining bytes
>> > for
>> > > >    the current file split that the reader is processing? How would
>> > users
>> > > >    interpret or use this metric? enumerator keeps tracks of the
>> > > >    pending/unassigned splits, which is an indication of the size of
>> the
>> > > >    backlog. that would be very useful
>> > > >
>> > > >
>> > > > <G extends Gauge<Long>> G setPendingRecordsGauge(G
>> > pendingRecordsGauge);
>> > > >
>> > > >    - In the Kafka source case, this is intended to capture the
>> consumer
>> > > lag
>> > > >    (log head offset from broker - current record offset)? that
>> could be
>> > > > used
>> > > >    to capture the size of the backlog
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Jul 13, 2021 at 3:01 PM Arvid Heise <ar...@apache.org>
>> wrote:
>> > > >
>> > > > > Hi Becket,
>> > > > >
>> > > > > I believe 1+2 has been answered by Chesnay already. Just to add
>> to 2:
>> > > I'm
>> > > > > not the biggest fan of reusing task metrics but that's what
>> FLIP-33
>> > and
>> > > > > different folks suggested. I'd probably keep task I/O metrics only
>> > for
>> > > > > internal things and add a new metric for external calls. Then, we
>> > could
>> > > > > even allow users to track I/O in AsyncIO (which would currently
>> be a
>> > > > mess).
>> > > > > However, with the current abstraction, it would be relatively
>> easy to
>> > > add
>> > > > > separate metrics later.
>> > > > >
>> > > > > 3. As outlined in the JavaDoc and in the draft PR [1], it's up to
>> the
>> > > > user
>> > > > > to implement it in a way that fetch time always corresponds to the
>> > > latest
>> > > > > polled record. For SourceReaderBase, I have added a new
>> > > > > RecordsWithSplitIds#lastFetchTime (with default return value null)
>> > that
>> > > > > sets the last fetch time automatically whenever the next batch is
>> > > > selected.
>> > > > > Tbh this metric is a bit more challenging to implement for
>> > > > > non-SourceReaderBase sources but I have not found a better,
>> > thread-safe
>> > > > > way. Of course, we could shift the complete calculation into
>> > user-land
>> > > > but
>> > > > > I'm not sure that this is easier.
>> > > > > For your scenarios:
>> > > > > - in A, you assume SourceReaderBase. In that case, we could
>> eagerly
>> > > > report
>> > > > > the metric as sketched by you. It depends on the definition of
>> "last
>> > > > > processed record" in FLIP-33, whether this eager reporting is more
>> > > > correct
>> > > > > than the lazy reporting that I have proposed. The former case
>> assumes
>> > > > "last
>> > > > > processed record" = last fetched record, while the latter case
>> > assumes
>> > > > > "last processed record" = "last polled record". For the proposed
>> > > > solution,
>> > > > > the user would just need to implement
>> > > RecordsWithSplitIds#lastFetchTime,
>> > > > > which typically corresponds to the creation time of the
>> > > > RecordsWithSplitIds
>> > > > > instance.
>> > > > > - B is not assuming SourceReaderBase.
>> > > > > If it's SourceReaderBase, the same proposed solution works out of
>> the
>> > > > box:
>> > > > > SourceOperator intercepts the emitted event time and uses the
>> fetch
>> > > time
>> > > > of
>> > > > > the current batch.
>> > > > > If it's not SourceReaderBase, the user would need to attach the
>> > > timestamp
>> > > > > to the handover protocol if multi-threaded and set the
>> > > lastFetchTimeGauge
>> > > > > when a value in the handover protocol is selected (typically a
>> > batch).
>> > > > > If it's a single threaded source, the user could directly set the
>> > > current
>> > > > > timestamp after fetching the records in a sync fashion.
>> > > > > The bad case is if the user is fetching individual records (either
>> > sync
>> > > > or
>> > > > > async), then the fetch time would be updated with every record.
>> > > However,
>> > > > > I'm assuming that the required system call is dwarfed by involved
>> > I/O.
>> > > > >
>> > > > > [1] https://github.com/apache/flink/pull/15972
>> > > > >
>> > > > > On Tue, Jul 13, 2021 at 12:58 PM Chesnay Schepler <
>> > ches...@apache.org>
>> > > > > wrote:
>> > > > >
>> > > > > > Re 1: We don't expose the reuse* methods, because the proposed
>> > > > > > OperatorIOMetricGroup is a separate interface from the existing
>> > > > > > implementations (which will be renamed and implement the new
>> > > > interface).
>> > > > > >
>> > > > > > Re 2: Currently the plan is to re-use the "new" numByesIn/Out
>> > > counters
>> > > > > > for tasks ("new" because all we are doing is exposing already
>> > > existing
>> > > > > > metrics). We may however change this in the future if we want to
>> > > report
>> > > > > > the byte metrics on an operator level, which is primarily
>> > interesting
>> > > > > > for async IO or other external connectivity outside of
>> > sinks/sources.
>> > > > > >
>> > > > > > On 13/07/2021 12:38, Becket Qin wrote:
>> > > > > > > Hi Arvid,
>> > > > > > >
>> > > > > > > Thanks for the proposal. I like the idea of exposing concrete
>> > > metric
>> > > > > > group
>> > > > > > > class so that users can access the predefined metrics.
>> > > > > > >
>> > > > > > > A few questions are following:
>> > > > > > >
>> > > > > > > 1. When exposing the OperatorIOMetrics to the users, we are
>> also
>> > > > > exposing
>> > > > > > > the reuseInputMetricsForTask to the users. Should we hide
>> these
>> > two
>> > > > > > methods
>> > > > > > > because users won't have enough information to decide whether
>> the
>> > > > > records
>> > > > > > > IO metrics should be reused by the task or not.
>> > > > > > >
>> > > > > > > 2. Similar to question 1, in the OperatorIOMetricGroup, we are
>> > > adding
>> > > > > > > numBytesInCounter and numBytesOutCounter. Should these
>> metrics be
>> > > > > reusing
>> > > > > > > the task level metrics by default?
>> > > > > > >
>> > > > > > > 3. Regarding SourceMetricGroup#setLastFetchTimeGauge(), I am
>> not
>> > > sure
>> > > > > how
>> > > > > > > it works with the FetchLag. Typically there are two cases when
>> > > > > reporting
>> > > > > > > the fetch lag.
>> > > > > > >      A. The EventTime is known at the point when the record is
>> > > > fetched
>> > > > > in
>> > > > > > > the SplitFetcher, so the fetch lag can be derived and reported
>> > > > > > immediately.
>> > > > > > >      B. The EventTime is known only after the fetched record
>> was
>> > > > parsed
>> > > > > > in
>> > > > > > > the RecordEmitter. In this case, the RecordEmitter needs to
>> get
>> > the
>> > > > > fetch
>> > > > > > > time of that particular record.
>> > > > > > > I am not sure when users set the LastFetchTime in the above
>> two
>> > > > cases.
>> > > > > > Can
>> > > > > > > you help elaborate on how users should use it?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Jiangjie (Becket) Qin
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Jul 8, 2021 at 10:25 PM Arvid Heise <ar...@apache.org
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > >> Dear devs,
>> > > > > > >>
>> > > > > > >> As a continuation and generalization of FLIP-33 (Standardize
>> > > > Connector
>> > > > > > >> Metrics) [1], we'd like to discuss how we actually expose the
>> > > > > > standardized
>> > > > > > >> operator metrics to users in terms of changes to the API.
>> > > > > > >>
>> > > > > > >> Please check out the FLIP [2] and provide feedback.
>> > > > > > >>
>> > > > > > >> Best,
>> > > > > > >>
>> > > > > > >> Arvid
>> > > > > > >>
>> > > > > > >> [1]
>> > > > > > >>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > > > > > >> [2]
>> > > > > > >>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-179%3A+Expose+Standardized+Operator+Metrics
>> > > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to