Hi Becket!

I am wondering if it makes sense to do the following small change:

  - Have "currentFetchEventTimeLag" be defined on event timestamps
(optionally, if the source system exposes it)  <== this is like in your
proposal
      this helps understand how long the records were in the source before

  - BUT change "currentEmitEventTimeLag" to be "currentSourceWatermarkLag"
instead.
    That way, users can see how far behind the wall-clock-time the source
progress is all in all.
    It is also a more well defined metric as it does not oscillate with
out-of-order events.

What do you think?

Best,
Stephan



On Fri, Sep 18, 2020 at 12:02 PM Becket Qin <becket....@gmail.com> wrote:

> Hi folks,
>
> Thanks for all the great feedback. I have just updated FLIP-33 wiki with
> the following changes:
>
> 1. Renaming. "currentFetchLatency" to "currentFetchEventTimeLag",
> "currentLatency" to "currentEmitEventTimeLag".
> 2. Added the public interface code change required for the new metrics.
> 3. Added description of whether a metric is predefined or optional, and
> which component is expected to update the metric.
>
> Please let me know if you have any questions. I'll start a vote in two
> days if there are no further concerns.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Sep 9, 2020 at 9:56 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> Thanks for the input. Just a few more clarifications / questions.
>>
>> *Num Bytes / Records Metrics*
>>
>> 1. At this point, the *numRecordsIn(Rate)* metrics exist in both
>> OperatorIOMetricGroup and TaskIOMetricGroup. I did not find
>> *numRecordsIn(Rate)* in the TaskIOMetricGroup updated anywhere other
>> than in the unit tests. Am I missing something?
>>
>> 2. *numBytesIn(Rate)* metrics only exist in TaskIOMetricGroup. At this
>> point, the SourceReaders only has access to a SourceReaderContext which
>> provides an OperatorMetricGroup. So it seems that the connector developers
>> are not able to update the *numBytesIn(Rate). *With the multiple Source
>> chaining support, it is possible that there are multiple Sources are in the
>> same task. So it looks that we need to add *numBytesIn(Rate)* to the
>> operator metrics as well.
>>
>>
>> *Current (Fetch) Latency*
>>
>> *currentFetchLatency* helps clearly tell whether the latency is caused
>> by Flink or not. Backpressure is not the only reason that we see fetch
>> latency. Even if there is no back pressure, the records may have passed a
>> long pipeline before they entered Flink. For example, say the *currentLatency
>> *is 10 seconds and there is no backpressure. Does that mean the record
>> spent 10 seconds in the Source operator? If not, how much did Flink
>> contribute to that 10 seconds of latency? These questions are frequently
>> asked and hard to tell without the fetch latency.
>>
>> For "currentFetchLatency", we would need to understand timestamps before
>>> the records are decoded. That is only possible for some sources, where the
>>> client gives us the records in a (partially) decoded from already (like
>>> Kafka). Then, some work has been done between the fetch time and the time
>>> we update the metric already, so it is already a bit closer to the
>>> "currentFetchLatency". I think following this train of thought, there is
>>> diminished benefit from that specific metric.
>>
>>
>> We may not have to report the fetch latency before records are decoded.
>> One solution is to remember the* FetchTime* when the encoded records are
>> fetched, and report the fetch latency after the records are decoded by
>> computing (*FetchTime - EventTime*). An approximate implementation would
>> be adding a *FetchTime *field to the *RecordsWithSplitIds* assuming that
>> all the records in that data structure are fetched at the same time.
>>
>> Thoughts?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Sep 9, 2020 at 12:42 AM Stephan Ewen <step...@ververica.com>
>> wrote:
>>
>>> Thanks for reviving this, Becket!
>>>
>>> I think Konstantin's comments are great. I'd add these points:
>>>
>>> *Num Bytes / Records Metrics*
>>>
>>> For "numBytesIn" and "numRecordsIn", we should reuse the
>>> OperatorIOMetric group, then it also gets reported to the overview page in
>>> the Web UI.
>>>
>>> The "numBytesInPerSecond" and "numRecordsInPerSecond" are automatically
>>> derived metrics, no need to do anything once we populate the above two
>>> metrics
>>>
>>>
>>> *Current (Fetch) Latency*
>>>
>>> I would really go for "eventTimeLag" rather than "fetchLatency". I think
>>> "eventTimeLag" is a term that has some adoption in the Flink community and
>>> beyond.
>>>
>>> I am not so sure that I see the benefit between "currentLatency" and
>>> "currentFetchLatency", (or event time lag before/after) as this only is
>>> different by the time it takes to emit a batch.
>>>      - In a non-backpressured case, these should be virtually identical
>>> (and both dominated by watermark lag, not the actual time it takes the
>>> fetch to be emitted)
>>>      - In a backpressured case, why do you care about when data was
>>> fetched, as opposed to emitted? Emitted time is relevant for application
>>> semantics and checkpoints. Fetch time seems to be an implementation detail
>>> (how much does the source buffer).
>>>
>>> The "currentLatency" (eventTimeLagAfter) can be computed out-of-the-box,
>>> independent of a source implementation, so that is also a good argument to
>>> make it the main metric.
>>> We know timestamps and watermarks in the source. Except for cases where
>>> no watermarks have been defined at all (batch jobs or pure processing time
>>> jobs), in which case this metric should probably be "Infinite".
>>>
>>> For "currentFetchLatency", we would need to understand timestamps before
>>> the records are decoded. That is only possible for some sources, where the
>>> client gives us the records in a (partially) decoded from already (like
>>> Kafka). Then, some work has been done between the fetch time and the time
>>> we update the metric already, so it is already a bit closer to the
>>> "currentFetchLatency". I think following this train of thought, there is
>>> diminished benefit from that specific metric.
>>>
>>>
>>> *Idle Time*
>>>
>>> I agree, it would be great to rename this. Maybe to "sourceWaitTime" or
>>> "sourceIdleTime" so to make clear that this is not exactly the time that
>>> Flink's processing pipeline is idle, but the time where the source does not
>>> have new data.
>>>
>>> This is not an easy metric to collect, though (except maybe for the
>>> sources that are only idle while they have no split assigned, like
>>> continuous file source).
>>>
>>> *Source Specific Metrics*
>>>
>>> I believe source-specific would only be "sourceIdleTime",
>>> "numRecordsInErrors", "pendingBytes", and "pendingRecords".
>>>
>>>
>>> *Conclusion*
>>>
>>> We can probably add "numBytesIn" and "numRecordsIn" and "eventTimeLag"
>>> right away, with little complexity.
>>> I'd suggest to start with these right away.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Tue, Sep 8, 2020 at 3:25 PM Becket Qin <becket....@gmail.com> wrote:
>>>
>>>> Hey Konstantin,
>>>>
>>>> Thanks for the feedback and suggestions. Please see the reply below.
>>>>
>>>> * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has
>>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864.
>>>>> They
>>>>> have a similar name, but different definitions of idleness,
>>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
>>>>> backpressured. Can we make it clearer that these two metrics mean
>>>>> different
>>>>> things?
>>>>
>>>>
>>>> That is a good point. I did not notice this metric earlier. It seems
>>>> that both metrics are useful to the users. One tells them how busy the
>>>> source is and how much more throughput the source can handle. The other
>>>> tells the users how long since the source has seen the last record, which
>>>> is useful for debugging. I'll update the FLIP to make it clear.
>>>>
>>>>   * "current(Fetch)Latency" I am wondering if
>>>>> "eventTimeLag(Before|After)"
>>>>> is more descriptive/clear. What do others think?
>>>>
>>>>
>>>> I am quite open to the ideas on these names. In fact I also feel
>>>> "current(Fetch)Latency" are not super intuitive. So it would be great if we
>>>> can have better names.
>>>>
>>>>   * Current(Fetch)Latency implies that the timestamps are directly
>>>>> extracted in the source connector, right? Will this be the default for
>>>>> FLIP-27 sources anyway?
>>>>
>>>>
>>>> The "currentFetchLatency" will probably be reported by each source
>>>> implementation, because the data fetching is done by SplitReaders and there
>>>> is no base implementation. The "currentLatency", on the other hand, can be
>>>> reported by the SourceReader base implementation. However, since developers
>>>> can actually implement their own source connector without using our base
>>>> implementation, these metric guidance are still useful for the connector
>>>> developers in that case.
>>>>
>>>> * Does FLIP-33 also include the implementation of these metrics (to the
>>>>> extent possible) for all connectors currently available in Apache
>>>>> Flink or
>>>>> is the "per-connector implementation" out-of-scope?
>>>>
>>>>
>>>> FLIP-33 itself does not specify any implementation of those metrics.
>>>> But the connectors we provide in Apache Flink will follow the guidance of
>>>> FLIP-33 to emit those metrics when applicable. Maybe We can have some
>>>> public static Strings defined for the metric names to help other connector
>>>> developers follow the same guidance. I can also add that to the public
>>>> interface section of the FLIP if we decide to do that.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Tue, Sep 8, 2020 at 9:02 PM Becket Qin <becket....@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf <kna...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Becket,
>>>>>>
>>>>>> Thank you for picking up this FLIP. I have a few questions:
>>>>>>
>>>>>> * two thoughts on naming:
>>>>>>    * idleTime: In the meantime, a similar metric
>>>>>> "idleTimeMsPerSecond" has
>>>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864.
>>>>>> They
>>>>>> have a similar name, but different definitions of idleness,
>>>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
>>>>>> backpressured. Can we make it clearer that these two metrics mean
>>>>>> different
>>>>>> things?
>>>>>>
>>>>>>   * "current(Fetch)Latency" I am wondering if
>>>>>> "eventTimeLag(Before|After)"
>>>>>> is more descriptive/clear. What do others think?
>>>>>>
>>>>>>   * Current(Fetch)Latency implies that the timestamps are directly
>>>>>> extracted in the source connector, right? Will this be the default for
>>>>>> FLIP-27 sources anyway?
>>>>>>
>>>>>> * Does FLIP-33 also include the implementation of these metrics (to
>>>>>> the
>>>>>> extent possible) for all connectors currently available in Apache
>>>>>> Flink or
>>>>>> is the "per-connector implementation" out-of-scope?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin <becket....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi all,
>>>>>> >
>>>>>> > To complete the Source refactoring work, I'd like to revive this
>>>>>> > discussion. Since the mail thread has been dormant for more than a
>>>>>> year,
>>>>>> > just to recap the motivation of the FLIP:
>>>>>> >
>>>>>> > 1. The FLIP proposes to standardize the connector metrics by giving
>>>>>> > guidance on the metric specifications, including the name, type and
>>>>>> meaning
>>>>>> > of the metrics.
>>>>>> > 2. It is OK for a connector to not emit some of the metrics in the
>>>>>> metric
>>>>>> > guidance, but if a metric of the same semantic is emitted, the
>>>>>> > implementation should follow the guidance.
>>>>>> > 3. It is OK for a connector to emit more metrics than what are
>>>>>> listed in
>>>>>> > the FLIP. This includes having an alias for a metric specified in
>>>>>> the FLIP.
>>>>>> > 4. We will implement some of the metrics out of the box in the
>>>>>> default
>>>>>> > implementation of FLIP-27, as long as it is applicable.
>>>>>> >
>>>>>> > The FLIP wiki is following:
>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33
>>>>>> > %3A+Standardize+Connector+Metrics
>>>>>> >
>>>>>> > Any thoughts?
>>>>>> >
>>>>>> > Thanks,
>>>>>> >
>>>>>> > Jiangjie (Becket) Qin
>>>>>> >
>>>>>> >
>>>>>> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski <pi...@ververica.com
>>>>>> >
>>>>>> > wrote:
>>>>>> >
>>>>>> > > > we will need to revisit the convention list and adjust them
>>>>>> accordingly
>>>>>> > > when FLIP-27 is ready
>>>>>> > >
>>>>>> > >
>>>>>> > > Yes, this sounds good :)
>>>>>> > >
>>>>>> > > Piotrek
>>>>>> > >
>>>>>> > > > On 13 Jun 2019, at 13:35, Becket Qin <becket....@gmail.com>
>>>>>> wrote:
>>>>>> > > >
>>>>>> > > > Hi Piotr,
>>>>>> > > >
>>>>>> > > > That's great to know. Chances are that we will need to revisit
>>>>>> the
>>>>>> > > > convention list and adjust them accordingly when FLIP-27 is
>>>>>> ready, At
>>>>>> > > that
>>>>>> > > > point we can mark some of the metrics as available by default
>>>>>> for
>>>>>> > > > connectors implementing the new interface.
>>>>>> > > >
>>>>>> > > > Thanks,
>>>>>> > > >
>>>>>> > > > Jiangjie (Becket) Qin
>>>>>> > > >
>>>>>> > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski <
>>>>>> pi...@ververica.com>
>>>>>> > > wrote:
>>>>>> > > >
>>>>>> > > >> Thanks for driving this. I’ve just noticed one small thing.
>>>>>> With new
>>>>>> > > >> SourceReader interface Flink will be able to provide
>>>>>> `idleTime` metric
>>>>>> > > >> automatically.
>>>>>> > > >>
>>>>>> > > >> Piotrek
>>>>>> > > >>
>>>>>> > > >>> On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com>
>>>>>> wrote:
>>>>>> > > >>>
>>>>>> > > >>> Thanks all for the feedback and discussion.
>>>>>> > > >>>
>>>>>> > > >>> Since there wasn't any concern raised, I've started the
>>>>>> voting thread
>>>>>> > > for
>>>>>> > > >>> this FLIP, but please feel free to continue the discussion
>>>>>> here if
>>>>>> > you
>>>>>> > > >>> think something still needs to be addressed.
>>>>>> > > >>>
>>>>>> > > >>> Thanks,
>>>>>> > > >>>
>>>>>> > > >>> Jiangjie (Becket) Qin
>>>>>> > > >>>
>>>>>> > > >>>
>>>>>> > > >>>
>>>>>> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin <
>>>>>> becket....@gmail.com>
>>>>>> > > wrote:
>>>>>> > > >>>
>>>>>> > > >>>> Hi Piotr,
>>>>>> > > >>>>
>>>>>> > > >>>> Thanks for the comments. Yes, you are right. Users will have
>>>>>> to look
>>>>>> > > at
>>>>>> > > >>>> other metrics to decide whether the pipeline is healthy or
>>>>>> not in
>>>>>> > the
>>>>>> > > >> first
>>>>>> > > >>>> place before they can use the time-based metric to fix the
>>>>>> > bottleneck.
>>>>>> > > >>>>
>>>>>> > > >>>> I agree that once we have FLIP-27 ready, some of the metrics
>>>>>> can
>>>>>> > just
>>>>>> > > be
>>>>>> > > >>>> reported by the abstract implementation.
>>>>>> > > >>>>
>>>>>> > > >>>> I've updated FLIP-33 wiki page to add the pendingBytes and
>>>>>> > > >> pendingRecords
>>>>>> > > >>>> metric. Please let me know if you have any concern over the
>>>>>> updated
>>>>>> > > >> metric
>>>>>> > > >>>> convention proposal.
>>>>>> > > >>>>
>>>>>> > > >>>> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen
>>>>>> > > >>>> <step...@ververica.com> will you also have time to take a
>>>>>> look at
>>>>>> > the
>>>>>> > > >>>> proposed metric convention? If there is no further concern
>>>>>> I'll
>>>>>> > start
>>>>>> > > a
>>>>>> > > >>>> voting thread for this FLIP in two days.
>>>>>> > > >>>>
>>>>>> > > >>>> Thanks,
>>>>>> > > >>>>
>>>>>> > > >>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>
>>>>>> > > >>>>
>>>>>> > > >>>>
>>>>>> > > >>>> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski <
>>>>>> pi...@ververica.com>
>>>>>> > > >> wrote:
>>>>>> > > >>>>
>>>>>> > > >>>>> Hi Becket,
>>>>>> > > >>>>>
>>>>>> > > >>>>> Thanks for the answer :)
>>>>>> > > >>>>>
>>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on
>>>>>> > producing
>>>>>> > > >> the
>>>>>> > > >>>>>> record to downstream. For example, a source connector can
>>>>>> report
>>>>>> > > that
>>>>>> > > >>>>> it's
>>>>>> > > >>>>>> spending 80% of time to emit record to downstream
>>>>>> processing
>>>>>> > > pipeline.
>>>>>> > > >>>>> In
>>>>>> > > >>>>>> another case, a sink connector may report that its
>>>>>> spending 30% of
>>>>>> > > >> time
>>>>>> > > >>>>>> producing the records to the external system.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage
>>>>>> metric:
>>>>>> > > >>>>>
>>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream --->
>>>>>> > > downstream
>>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full.
>>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream --->
>>>>>> > > downstream
>>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not
>>>>>> full.
>>>>>> > > >>>>>
>>>>>> > > >>>>> If by “time spent on emitting records to downstream” you
>>>>>> understand
>>>>>> > > >>>>> “waiting on back pressure”, then I see your point. And I
>>>>>> agree that
>>>>>> > > >> some
>>>>>> > > >>>>> kind of ratio/time based metric gives you more information.
>>>>>> However
>>>>>> > > >> under
>>>>>> > > >>>>> “time spent on emitting records to downstream” might be
>>>>>> hidden the
>>>>>> > > >>>>> following (extreme) situation:
>>>>>> > > >>>>>
>>>>>> > > >>>>> 1. Job is barely able to handle influx of records, there is
>>>>>> 99%
>>>>>> > > >>>>> CPU/resource usage in the cluster, but nobody is
>>>>>> > > >>>>> bottlenecked/backpressured, all output buffers are empty,
>>>>>> everybody
>>>>>> > > is
>>>>>> > > >>>>> waiting in 1% of it’s time for more records to process.
>>>>>> > > >>>>> 2. 80% time can still be spent on "down stream operators”,
>>>>>> because
>>>>>> > > they
>>>>>> > > >>>>> are the CPU intensive operations, but this doesn’t mean that
>>>>>> > > >> increasing the
>>>>>> > > >>>>> parallelism down the stream will help with anything there.
>>>>>> To the
>>>>>> > > >> contrary,
>>>>>> > > >>>>> increasing parallelism of the source operator might help to
>>>>>> > increase
>>>>>> > > >>>>> resource utilisation up to 100%.
>>>>>> > > >>>>>
>>>>>> > > >>>>> However, this “time based/ratio” approach can be extended to
>>>>>> > > in/output
>>>>>> > > >>>>> buffer usage. Besides collecting an information that
>>>>>> input/output
>>>>>> > > >> buffer is
>>>>>> > > >>>>> full/empty, we can probe profile how often are buffer
>>>>>> empty/full.
>>>>>> > If
>>>>>> > > >> output
>>>>>> > > >>>>> buffer is full 1% of times, there is almost no back
>>>>>> pressure. If
>>>>>> > it’s
>>>>>> > > >> full
>>>>>> > > >>>>> 80% of times, there is some back pressure, if it’s full
>>>>>> 99.9% of
>>>>>> > > times,
>>>>>> > > >>>>> there is huge back pressure.
>>>>>> > > >>>>>
>>>>>> > > >>>>> Now for autoscaling you could compare the input & output
>>>>>> buffers
>>>>>> > fill
>>>>>> > > >>>>> ratio:
>>>>>> > > >>>>>
>>>>>> > > >>>>> 1. Both are high, the source of bottleneck is down the
>>>>>> stream
>>>>>> > > >>>>> 2. Output is low, input is high, this is the bottleneck and
>>>>>> the
>>>>>> > > higher
>>>>>> > > >>>>> the difference, the bigger source of bottleneck is this is
>>>>>> > > >> operator/task
>>>>>> > > >>>>> 3. Output is high, input is low - there was some load spike
>>>>>> that we
>>>>>> > > are
>>>>>> > > >>>>> currently finishing to process
>>>>>> > > >>>>>
>>>>>> > > >>>>>
>>>>>> > > >>>>>
>>>>>> > > >>>>> But long story short, we are probably diverging from the
>>>>>> topic of
>>>>>> > > this
>>>>>> > > >>>>> discussion, and we can discuss this at some later point.
>>>>>> > > >>>>>
>>>>>> > > >>>>> For now, for sources:
>>>>>> > > >>>>>
>>>>>> > > >>>>> as I wrote before, +1 for:
>>>>>> > > >>>>> - pending.bytes, Gauge
>>>>>> > > >>>>> - pending.messages, Gauge
>>>>>> > > >>>>>
>>>>>> > > >>>>> When we will be developing/discussing SourceReader from
>>>>>> FLIP-27 we
>>>>>> > > >> might
>>>>>> > > >>>>> then add:
>>>>>> > > >>>>>
>>>>>> > > >>>>> - in-memory.buffer.usage (0 - 100%)
>>>>>> > > >>>>>
>>>>>> > > >>>>> Which will be estimated automatically by Flink while user
>>>>>> will be
>>>>>> > > able
>>>>>> > > >> to
>>>>>> > > >>>>> override/provide better estimation.
>>>>>> > > >>>>>
>>>>>> > > >>>>> Piotrek
>>>>>> > > >>>>>
>>>>>> > > >>>>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com>
>>>>>> wrote:
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Hi Piotr,
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Thanks for the explanation. Please see some clarifications
>>>>>> below.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on
>>>>>> > producing
>>>>>> > > >> the
>>>>>> > > >>>>>> record to downstream. For example, a source connector can
>>>>>> report
>>>>>> > > that
>>>>>> > > >>>>> it's
>>>>>> > > >>>>>> spending 80% of time to emit record to downstream
>>>>>> processing
>>>>>> > > pipeline.
>>>>>> > > >>>>> In
>>>>>> > > >>>>>> another case, a sink connector may report that its
>>>>>> spending 30% of
>>>>>> > > >> time
>>>>>> > > >>>>>> producing the records to the external system.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage
>>>>>> metric:
>>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream --->
>>>>>> > > downstream
>>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full.
>>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream --->
>>>>>> > > downstream
>>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not
>>>>>> full.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> However, the time-based metric has a few advantages that
>>>>>> the
>>>>>> > buffer
>>>>>> > > >>>>> usage
>>>>>> > > >>>>>> metric may not have.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> 1.  Buffer usage metric may not be applicable to all the
>>>>>> connector
>>>>>> > > >>>>>> implementations, while reporting time-based metric are
>>>>>> always
>>>>>> > > doable.
>>>>>> > > >>>>>> Some source connectors may not have any input buffer, or
>>>>>> they may
>>>>>> > > use
>>>>>> > > >>>>> some
>>>>>> > > >>>>>> third party library that does not expose the input buffer
>>>>>> at all.
>>>>>> > > >>>>>> Similarly, for sink connectors, the implementation may not
>>>>>> have
>>>>>> > any
>>>>>> > > >>>>> output
>>>>>> > > >>>>>> buffer, or the third party library does not expose such
>>>>>> buffer.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> 2. Although both type of metrics can detect bottleneck,
>>>>>> time-based
>>>>>> > > >>>>> metrics
>>>>>> > > >>>>>> can be used to generate a more informed action to remove
>>>>>> the
>>>>>> > > >> bottleneck.
>>>>>> > > >>>>>> For example, when the downstream is bottleneck, the output
>>>>>> buffer
>>>>>> > > >> usage
>>>>>> > > >>>>>> metric is likely to be 100%, and the input buffer usage
>>>>>> might be
>>>>>> > 0%.
>>>>>> > > >>>>> That
>>>>>> > > >>>>>> means we don't know what is the suitable parallelism to
>>>>>> lift the
>>>>>> > > >>>>>> bottleneck. The time-based metric, on the other hand,
>>>>>> would give
>>>>>> > > >> useful
>>>>>> > > >>>>>> information, e.g. if 80% of time was spent on emitting
>>>>>> records, we
>>>>>> > > can
>>>>>> > > >>>>>> roughly increase the downstream node parallelism by 4
>>>>>> times.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Admittedly, the time-based metrics are more expensive than
>>>>>> buffer
>>>>>> > > >>>>> usage. So
>>>>>> > > >>>>>> we may have to do some sampling to reduce the cost. But in
>>>>>> > general,
>>>>>> > > >>>>> using
>>>>>> > > >>>>>> time-based metrics seems worth adding.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> That being said, I don't think buffer usage metric and
>>>>>> time-based
>>>>>> > > >>>>> metrics
>>>>>> > > >>>>>> are mutually exclusive. We can probably have both. It is
>>>>>> just that
>>>>>> > > in
>>>>>> > > >>>>>> practice, features like auto-scaling might prefer
>>>>>> time-based
>>>>>> > metrics
>>>>>> > > >> for
>>>>>> > > >>>>>> the reason stated above.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually
>>>>>> detect
>>>>>> > > >>>>> bottlenecks.
>>>>>> > > >>>>>> As I wrote, we already have them in most of the places,
>>>>>> except of
>>>>>> > > >>>>>> sources/sinks.
>>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks.
>>>>>> > > Currently
>>>>>> > > >> we
>>>>>> > > >>>>>> are only automatically detecting back pressure and
>>>>>> reporting it to
>>>>>> > > the
>>>>>> > > >>>>> user
>>>>>> > > >>>>>> in web UI (is it exposed as a metric at all?). Detecting
>>>>>> the root
>>>>>> > > >> cause
>>>>>> > > >>>>> of
>>>>>> > > >>>>>> the back pressure (bottleneck) is one step further.
>>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is
>>>>>> > located,
>>>>>> > > >> to
>>>>>> > > >>>>>> try to do something with it.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> As explained above, I think time-based metric also
>>>>>> addresses item
>>>>>> > 1
>>>>>> > > >> and
>>>>>> > > >>>>>> item 2.
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Any thoughts?
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Thanks,
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>
>>>>>> > > >>>>>>
>>>>>> > > >>>>>>
>>>>>> > > >>>>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski <
>>>>>> > pi...@ververica.com>
>>>>>> > > >>>>> wrote:
>>>>>> > > >>>>>>
>>>>>> > > >>>>>>> Hi again :)
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>> - pending.bytes, Gauge
>>>>>> > > >>>>>>>> - pending.messages, Gauge
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> +1
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> And true, instead of overloading one of the metric it is
>>>>>> better
>>>>>> > > when
>>>>>> > > >>>>> user
>>>>>> > > >>>>>>> can choose to provide only one of them.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> Re 2:
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>> If I understand correctly, this metric along with the
>>>>>> pending
>>>>>> > > >> mesages
>>>>>> > > >>>>> /
>>>>>> > > >>>>>>>> bytes would answer the questions of:
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind
>>>>>> + empty
>>>>>> > > >>>>> buffer
>>>>>> > > >>>>>>> =
>>>>>> > > >>>>>>>> cannot consume fast enough.
>>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind +
>>>>>> full
>>>>>> > > buffer
>>>>>> > > >> =
>>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> Yes, exactly. This can also be used to support decisions
>>>>>> like
>>>>>> > > >> changing
>>>>>> > > >>>>> the
>>>>>> > > >>>>>>> parallelism of the sources and/or down stream operators.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> I’m not sure if I understand your proposal with time based
>>>>>> > > >>>>> measurements.
>>>>>> > > >>>>>>> Maybe I’m missing something, but I do not see how
>>>>>> measuring time
>>>>>> > > >> alone
>>>>>> > > >>>>>>> could answer the problem: where is the bottleneck. Time
>>>>>> spent on
>>>>>> > > the
>>>>>> > > >>>>>>> next/emit might be short or long (depending on how heavy
>>>>>> to
>>>>>> > process
>>>>>> > > >> the
>>>>>> > > >>>>>>> record is) and the source can still be bottlenecked/back
>>>>>> > pressured
>>>>>> > > or
>>>>>> > > >>>>> not.
>>>>>> > > >>>>>>> Usually the easiest and the most reliable way how to
>>>>>> detect
>>>>>> > > >>>>> bottlenecks is
>>>>>> > > >>>>>>> by checking usage of input & output buffers, since when
>>>>>> input
>>>>>> > > buffer
>>>>>> > > >> is
>>>>>> > > >>>>>>> full while output buffer is empty, that’s the definition
>>>>>> of a
>>>>>> > > >>>>> bottleneck.
>>>>>> > > >>>>>>> Also this is usually very easy and cheap to measure (it
>>>>>> works
>>>>>> > > >>>>> effectively
>>>>>> > > >>>>>>> the same way as current’s Flink back pressure monitoring,
>>>>>> but
>>>>>> > more
>>>>>> > > >>>>> cleanly,
>>>>>> > > >>>>>>> without probing thread’s stack traces).
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> Also keep in mind that we are already using the buffer
>>>>>> usage
>>>>>> > > metrics
>>>>>> > > >>>>> for
>>>>>> > > >>>>>>> detecting the bottlenecks in Flink’s internal network
>>>>>> exchanges
>>>>>> > > >> (manual
>>>>>> > > >>>>>>> work). That’s the reason why I wanted to extend this to
>>>>>> > > >> sources/sinks,
>>>>>> > > >>>>>>> since they are currently our blind spot.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink
>>>>>> > > automatically
>>>>>> > > >>>>>>> relies
>>>>>> > > >>>>>>>> on some metrics answering the same question
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> That would be very helpful feature. I think in order to
>>>>>> achieve
>>>>>> > > that
>>>>>> > > >> we
>>>>>> > > >>>>>>> would need to:
>>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually
>>>>>> detect
>>>>>> > > >>>>> bottlenecks.
>>>>>> > > >>>>>>> As I wrote, we already have them in most of the places,
>>>>>> except of
>>>>>> > > >>>>>>> sources/sinks.
>>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks.
>>>>>> > > Currently
>>>>>> > > >> we
>>>>>> > > >>>>>>> are only automatically detecting back pressure and
>>>>>> reporting it
>>>>>> > to
>>>>>> > > >> the
>>>>>> > > >>>>> user
>>>>>> > > >>>>>>> in web UI (is it exposed as a metric at all?). Detecting
>>>>>> the root
>>>>>> > > >>>>> cause of
>>>>>> > > >>>>>>> the back pressure (bottleneck) is one step further.
>>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is
>>>>>> > located,
>>>>>> > > >> to
>>>>>> > > >>>>> try
>>>>>> > > >>>>>>> to do something with it.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> I think you are aiming for point 3., but before we reach
>>>>>> it, we
>>>>>> > are
>>>>>> > > >>>>> still
>>>>>> > > >>>>>>> missing 1. & 2. Also even if we have 3., there is a value
>>>>>> in 1 &
>>>>>> > 2
>>>>>> > > >> for
>>>>>> > > >>>>>>> manual analysis/dashboards.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> However, having the knowledge of where the bottleneck is,
>>>>>> doesn’t
>>>>>> > > >>>>>>> necessarily mean that we know what we can do about it. For
>>>>>> > example
>>>>>> > > >>>>>>> increasing parallelism might or might not help with
>>>>>> anything
>>>>>> > (data
>>>>>> > > >>>>> skew,
>>>>>> > > >>>>>>> bottleneck on some resource that does not scale), but
>>>>>> this remark
>>>>>> > > >>>>> applies
>>>>>> > > >>>>>>> always, regardless of the way how did we detect the
>>>>>> bottleneck.
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>> Piotrek
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>> On 3 Jun 2019, at 06:16, Becket Qin <
>>>>>> becket....@gmail.com>
>>>>>> > wrote:
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Hi Piotr,
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Thanks for the suggestion. Some thoughts below:
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Re 1: The pending messages / bytes.
>>>>>> > > >>>>>>>> I completely agree these are very useful metrics and we
>>>>>> should
>>>>>> > > >> expect
>>>>>> > > >>>>> the
>>>>>> > > >>>>>>>> connector to report. WRT the way to expose them, it
>>>>>> seems more
>>>>>> > > >>>>> consistent
>>>>>> > > >>>>>>>> to add two metrics instead of adding a method (unless
>>>>>> there are
>>>>>> > > >> other
>>>>>> > > >>>>> use
>>>>>> > > >>>>>>>> cases other than metric reporting). So we can add the
>>>>>> following
>>>>>> > > two
>>>>>> > > >>>>>>> metrics.
>>>>>> > > >>>>>>>> - pending.bytes, Gauge
>>>>>> > > >>>>>>>> - pending.messages, Gauge
>>>>>> > > >>>>>>>> Applicable connectors can choose to report them. These
>>>>>> two
>>>>>> > metrics
>>>>>> > > >>>>> along
>>>>>> > > >>>>>>>> with latency should be sufficient for users to
>>>>>> understand the
>>>>>> > > >> progress
>>>>>> > > >>>>>>> of a
>>>>>> > > >>>>>>>> connector.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Re 2: Number of buffered data in-memory of the connector
>>>>>> > > >>>>>>>> If I understand correctly, this metric along with the
>>>>>> pending
>>>>>> > > >> mesages
>>>>>> > > >>>>> /
>>>>>> > > >>>>>>>> bytes would answer the questions of:
>>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind
>>>>>> + empty
>>>>>> > > >>>>> buffer
>>>>>> > > >>>>>>> =
>>>>>> > > >>>>>>>> cannot consume fast enough.
>>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind +
>>>>>> full
>>>>>> > > buffer
>>>>>> > > >> =
>>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink
>>>>>> > > automatically
>>>>>> > > >>>>>>> relies
>>>>>> > > >>>>>>>> on some metrics answering the same question, more
>>>>>> specifically,
>>>>>> > we
>>>>>> > > >> are
>>>>>> > > >>>>>>>> profiling the time spent on .next() method (time to
>>>>>> consume) and
>>>>>> > > the
>>>>>> > > >>>>> time
>>>>>> > > >>>>>>>> spent on .collect() method (time to emit / process). One
>>>>>> > advantage
>>>>>> > > >> of
>>>>>> > > >>>>>>> such
>>>>>> > > >>>>>>>> method level time cost allows us to calculate the
>>>>>> parallelism
>>>>>> > > >>>>> required to
>>>>>> > > >>>>>>>> keep up in case their is a lag.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> However, one concern I have regarding such metric is
>>>>>> that they
>>>>>> > are
>>>>>> > > >>>>>>>> implementation specific. Either profiling on the method
>>>>>> time, or
>>>>>> > > >>>>>>> reporting
>>>>>> > > >>>>>>>> buffer usage assumes the connector are implemented in
>>>>>> such a
>>>>>> > way.
>>>>>> > > A
>>>>>> > > >>>>>>>> slightly better solution might be have the following
>>>>>> metric:
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>>  - EmitTimeRatio (or FetchTimeRatio): The time spent on
>>>>>> emitting
>>>>>> > > >>>>>>>> records / Total time elapsed.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> This assumes that the source connectors have to emit the
>>>>>> records
>>>>>> > > to
>>>>>> > > >>>>> the
>>>>>> > > >>>>>>>> downstream at some point. The emission may take some
>>>>>> time ( e.g.
>>>>>> > > go
>>>>>> > > >>>>>>> through
>>>>>> > > >>>>>>>> chained operators). And the rest of the time are spent to
>>>>>> > prepare
>>>>>> > > >> the
>>>>>> > > >>>>>>>> record to emit, including time for consuming and format
>>>>>> > > conversion,
>>>>>> > > >>>>> etc.
>>>>>> > > >>>>>>>> Ideally, we'd like to see the time spent on record fetch
>>>>>> and
>>>>>> > emit
>>>>>> > > to
>>>>>> > > >>>>> be
>>>>>> > > >>>>>>>> about the same, so no one is bottleneck for the other.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> The downside of these time based metrics is additional
>>>>>> overhead
>>>>>> > to
>>>>>> > > >> get
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>> time, therefore sampling might be needed. But in
>>>>>> practice I feel
>>>>>> > > >> such
>>>>>> > > >>>>>>> time
>>>>>> > > >>>>>>>> based metric might be more useful if we want to take
>>>>>> action.
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> I think we should absolutely add metrics in (1) to the
>>>>>> metric
>>>>>> > > >>>>> convention.
>>>>>> > > >>>>>>>> We could also add the metrics mentioned in (2) if we
>>>>>> reach
>>>>>> > > consensus
>>>>>> > > >>>>> on
>>>>>> > > >>>>>>>> that. What do you think?
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Thanks,
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski <
>>>>>> > > pi...@ververica.com
>>>>>> > > >>>
>>>>>> > > >>>>>>> wrote:
>>>>>> > > >>>>>>>>
>>>>>> > > >>>>>>>>> Hey Becket,
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Re 1a) and 1b) +1 from my side.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> I’ve discussed this issue:
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us
>>>>>> to check
>>>>>> > > the
>>>>>> > > >>>>> cause
>>>>>> > > >>>>>>>>> of
>>>>>> > > >>>>>>>>>>>> back pressure:
>>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or
>>>>>> boolean
>>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty)
>>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or
>>>>>> boolean
>>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty)
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> With Nico at some lengths and he also saw the benefits
>>>>>> of them.
>>>>>> > > We
>>>>>> > > >>>>> also
>>>>>> > > >>>>>>>>> have more concrete proposal for that.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Actually there are two really useful metrics, that we
>>>>>> are
>>>>>> > missing
>>>>>> > > >>>>>>>>> currently:
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> 1. Number of data/records/bytes in the backlog to
>>>>>> process. For
>>>>>> > > >>>>> example
>>>>>> > > >>>>>>>>> remaining number of bytes in unread files. Or pending
>>>>>> data in
>>>>>> > > Kafka
>>>>>> > > >>>>>>> topics.
>>>>>> > > >>>>>>>>> 2. Number of buffered data in-memory of the connector,
>>>>>> that are
>>>>>> > > >>>>> waiting
>>>>>> > > >>>>>>> to
>>>>>> > > >>>>>>>>> be processed pushed to Flink pipeline.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Re 1:
>>>>>> > > >>>>>>>>> This would have to be a metric provided directly by a
>>>>>> > connector.
>>>>>> > > It
>>>>>> > > >>>>>>> could
>>>>>> > > >>>>>>>>> be an undefined `int`:
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> `int backlog` - estimate of pending work.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> “Undefined” meaning that it would be up to a connector
>>>>>> to
>>>>>> > decided
>>>>>> > > >>>>>>> whether
>>>>>> > > >>>>>>>>> it’s measured in bytes, records, pending files or
>>>>>> whatever it
>>>>>> > is
>>>>>> > > >>>>>>> possible
>>>>>> > > >>>>>>>>> to provide by the connector. This is because I assume
>>>>>> not every
>>>>>> > > >>>>>>> connector
>>>>>> > > >>>>>>>>> can provide exact number and for some of them it might
>>>>>> be
>>>>>> > > >> impossible
>>>>>> > > >>>>> to
>>>>>> > > >>>>>>>>> provide records number of bytes count.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Re 2:
>>>>>> > > >>>>>>>>> This metric could be either provided by a connector, or
>>>>>> > > calculated
>>>>>> > > >>>>>>> crudely
>>>>>> > > >>>>>>>>> by Flink:
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> `float bufferUsage` - value from [0.0, 1.0] range
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Percentage of used in memory buffers, like in Kafka’s
>>>>>> handover.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> It could be crudely implemented by Flink with FLIP-27
>>>>>> > > >>>>>>>>> SourceReader#isAvailable. If SourceReader is not
>>>>>> available
>>>>>> > > reported
>>>>>> > > >>>>>>>>> `bufferUsage` could be 0.0. If it is available, it
>>>>>> could be
>>>>>> > 1.0.
>>>>>> > > I
>>>>>> > > >>>>> think
>>>>>> > > >>>>>>>>> this would be a good enough estimation for most of the
>>>>>> use
>>>>>> > cases
>>>>>> > > >>>>> (that
>>>>>> > > >>>>>>>>> could be overloaded and implemented better if desired).
>>>>>> > > Especially
>>>>>> > > >>>>>>> since we
>>>>>> > > >>>>>>>>> are reporting only probed values: if probed values are
>>>>>> almost
>>>>>> > > >> always
>>>>>> > > >>>>>>> “1.0”,
>>>>>> > > >>>>>>>>> it would mean that we have a back pressure. If they are
>>>>>> almost
>>>>>> > > >> always
>>>>>> > > >>>>>>>>> “0.0”, there is probably no back pressure at the
>>>>>> sources.
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> What do you think about this?
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>> Piotrek
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>>> On 30 May 2019, at 11:41, Becket Qin <
>>>>>> becket....@gmail.com>
>>>>>> > > >> wrote:
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> Hi all,
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> Thanks a lot for all the feedback and comments. I'd
>>>>>> like to
>>>>>> > > >> continue
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>> discussion on this FLIP.
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> I updated the FLIP-33 wiki to remove all the histogram
>>>>>> metrics
>>>>>> > > >> from
>>>>>> > > >>>>> the
>>>>>> > > >>>>>>>>>> first version of metric convention due to the
>>>>>> performance
>>>>>> > > concern.
>>>>>> > > >>>>> The
>>>>>> > > >>>>>>>>> plan
>>>>>> > > >>>>>>>>>> is to introduce them later when we have a mechanism to
>>>>>> opt
>>>>>> > > in/out
>>>>>> > > >>>>>>>>> metrics.
>>>>>> > > >>>>>>>>>> At that point, users can decide whether they want to
>>>>>> pay the
>>>>>> > > cost
>>>>>> > > >> to
>>>>>> > > >>>>>>> get
>>>>>> > > >>>>>>>>>> the metric or not.
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> As Stephan suggested, for this FLIP, let's first try
>>>>>> to agree
>>>>>> > on
>>>>>> > > >> the
>>>>>> > > >>>>>>>>> small
>>>>>> > > >>>>>>>>>> list of conventional metrics that connectors should
>>>>>> follow.
>>>>>> > > >>>>>>>>>> Just to be clear, the purpose of the convention is not
>>>>>> to
>>>>>> > > enforce
>>>>>> > > >>>>> every
>>>>>> > > >>>>>>>>>> connector to report all these metrics, but to provide a
>>>>>> > guidance
>>>>>> > > >> in
>>>>>> > > >>>>>>> case
>>>>>> > > >>>>>>>>>> these metrics are reported by some connectors.
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> @ Stephan & Chesnay,
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> Regarding the duplication of `RecordsIn` metric in
>>>>>> operator /
>>>>>> > > task
>>>>>> > > >>>>>>>>>> IOMetricGroups, from what I understand, for source
>>>>>> operator,
>>>>>> > it
>>>>>> > > is
>>>>>> > > >>>>>>>>> actually
>>>>>> > > >>>>>>>>>> the SourceFunction that reports the operator level
>>>>>> > > >>>>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are
>>>>>> essentially
>>>>>> > the
>>>>>> > > >>>>> same
>>>>>> > > >>>>>>>>>> metric in the operator level IOMetricGroup. Similarly
>>>>>> for the
>>>>>> > > Sink
>>>>>> > > >>>>>>>>>> operator, the operator level
>>>>>> RecordsOut/RecordsOutPerSecond
>>>>>> > > >> metrics
>>>>>> > > >>>>> are
>>>>>> > > >>>>>>>>>> also reported by the Sink function. I marked them as
>>>>>> existing
>>>>>> > in
>>>>>> > > >> the
>>>>>> > > >>>>>>>>>> FLIP-33 wiki page. Please let me know if I
>>>>>> misunderstood.
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin <
>>>>>> > > becket....@gmail.com>
>>>>>> > > >>>>>>> wrote:
>>>>>> > > >>>>>>>>>>
>>>>>> > > >>>>>>>>>>> Hi Piotr,
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> Thanks a lot for the feedback.
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> 1a) I guess you are referring to the part that
>>>>>> "original
>>>>>> > system
>>>>>> > > >>>>>>> specific
>>>>>> > > >>>>>>>>>>> metrics should also be reported". The performance
>>>>>> impact
>>>>>> > > depends
>>>>>> > > >> on
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>> implementation. An efficient implementation would
>>>>>> only record
>>>>>> > > the
>>>>>> > > >>>>>>> metric
>>>>>> > > >>>>>>>>>>> once, but report them with two different metric
>>>>>> names. This
>>>>>> > is
>>>>>> > > >>>>>>> unlikely
>>>>>> > > >>>>>>>>> to
>>>>>> > > >>>>>>>>>>> hurt performance.
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> 1b) Yes, I agree that we should avoid adding overhead
>>>>>> to the
>>>>>> > > >>>>> critical
>>>>>> > > >>>>>>>>> path
>>>>>> > > >>>>>>>>>>> by all means. This is sometimes a tradeoff, running
>>>>>> blindly
>>>>>> > > >> without
>>>>>> > > >>>>>>> any
>>>>>> > > >>>>>>>>>>> metric gives best performance, but sometimes might be
>>>>>> > > frustrating
>>>>>> > > >>>>> when
>>>>>> > > >>>>>>>>> we
>>>>>> > > >>>>>>>>>>> debug some issues.
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> 2. The metrics are indeed very useful. Are they
>>>>>> supposed to
>>>>>> > be
>>>>>> > > >>>>>>> reported
>>>>>> > > >>>>>>>>> by
>>>>>> > > >>>>>>>>>>> the connectors or Flink itself? At this point FLIP-33
>>>>>> is more
>>>>>> > > >>>>> focused
>>>>>> > > >>>>>>> on
>>>>>> > > >>>>>>>>>>> provide a guidance to the connector authors on the
>>>>>> metrics
>>>>>> > > >>>>> reporting.
>>>>>> > > >>>>>>>>> That
>>>>>> > > >>>>>>>>>>> said, after FLIP-27, I think we should absolutely
>>>>>> report
>>>>>> > these
>>>>>> > > >>>>> metrics
>>>>>> > > >>>>>>>>> in
>>>>>> > > >>>>>>>>>>> the abstract implementation. In any case, the metric
>>>>>> > convention
>>>>>> > > >> in
>>>>>> > > >>>>>>> this
>>>>>> > > >>>>>>>>>>> list are expected to evolve over time.
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski <
>>>>>> > > >>>>> pi...@ververica.com>
>>>>>> > > >>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> Hi,
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> Thanks for the proposal and driving the effort here
>>>>>> Becket
>>>>>> > :)
>>>>>> > > >> I’ve
>>>>>> > > >>>>>>> read
>>>>>> > > >>>>>>>>>>>> through the FLIP-33 [1], and here are couple of my
>>>>>> thoughts.
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> Big +1 for standardising the metric names between
>>>>>> > connectors,
>>>>>> > > it
>>>>>> > > >>>>> will
>>>>>> > > >>>>>>>>>>>> definitely help us and users a lot.
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> Issues/questions/things to discuss that I’ve thought
>>>>>> of:
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> 1a. If we are about to duplicate some metrics, can
>>>>>> this
>>>>>> > > become a
>>>>>> > > >>>>>>>>>>>> performance issue?
>>>>>> > > >>>>>>>>>>>> 1b. Generally speaking, we should make sure that
>>>>>> collecting
>>>>>> > > >> those
>>>>>> > > >>>>>>>>> metrics
>>>>>> > > >>>>>>>>>>>> is as non intrusive as possible, especially that
>>>>>> they will
>>>>>> > > need
>>>>>> > > >>>>> to be
>>>>>> > > >>>>>>>>>>>> updated once per record. (They might be collected
>>>>>> more
>>>>>> > rarely
>>>>>> > > >> with
>>>>>> > > >>>>>>> some
>>>>>> > > >>>>>>>>>>>> overhead, but the hot path of updating it per record
>>>>>> will
>>>>>> > need
>>>>>> > > >> to
>>>>>> > > >>>>> be
>>>>>> > > >>>>>>> as
>>>>>> > > >>>>>>>>>>>> quick as possible). That includes both avoiding heavy
>>>>>> > > >> computation
>>>>>> > > >>>>> on
>>>>>> > > >>>>>>>>> per
>>>>>> > > >>>>>>>>>>>> record path: histograms?, measuring time for time
>>>>>> based
>>>>>> > > metrics
>>>>>> > > >>>>> (per
>>>>>> > > >>>>>>>>>>>> second) (System.currentTimeMillis() depending on the
>>>>>> > > >>>>> implementation
>>>>>> > > >>>>>>> can
>>>>>> > > >>>>>>>>>>>> invoke a system call)
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us
>>>>>> to check
>>>>>> > > the
>>>>>> > > >>>>> cause
>>>>>> > > >>>>>>>>> of
>>>>>> > > >>>>>>>>>>>> back pressure:
>>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or
>>>>>> boolean
>>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty)
>>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or
>>>>>> boolean
>>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty)
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> a) is useful in a scenario when we are processing
>>>>>> backlog of
>>>>>> > > >>>>> records,
>>>>>> > > >>>>>>>>> all
>>>>>> > > >>>>>>>>>>>> of the internal Flink’s input/output network buffers
>>>>>> are
>>>>>> > > empty,
>>>>>> > > >>>>> and
>>>>>> > > >>>>>>> we
>>>>>> > > >>>>>>>>> want
>>>>>> > > >>>>>>>>>>>> to check whether the external source system is the
>>>>>> > bottleneck
>>>>>> > > >>>>>>> (source’s
>>>>>> > > >>>>>>>>>>>> input queue will be empty), or if the Flink’s
>>>>>> connector is
>>>>>> > the
>>>>>> > > >>>>>>>>> bottleneck
>>>>>> > > >>>>>>>>>>>> (source’s input queues will be full).
>>>>>> > > >>>>>>>>>>>> b) similar story. Backlog of records, but this time
>>>>>> all of
>>>>>> > the
>>>>>> > > >>>>>>> internal
>>>>>> > > >>>>>>>>>>>> Flink’s input/ouput network buffers are full, and we
>>>>>> want o
>>>>>> > > >> check
>>>>>> > > >>>>>>>>> whether
>>>>>> > > >>>>>>>>>>>> the external sink system is the bottleneck (sink
>>>>>> output
>>>>>> > queues
>>>>>> > > >> are
>>>>>> > > >>>>>>>>> full),
>>>>>> > > >>>>>>>>>>>> or if the Flink’s connector is the bottleneck
>>>>>> (sink’s output
>>>>>> > > >>>>> queues
>>>>>> > > >>>>>>>>> will be
>>>>>> > > >>>>>>>>>>>> empty)
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> It might be sometimes difficult to provide those
>>>>>> metrics, so
>>>>>> > > >> they
>>>>>> > > >>>>>>> could
>>>>>> > > >>>>>>>>>>>> be optional, but if we could provide them, it would
>>>>>> be
>>>>>> > really
>>>>>> > > >>>>>>> helpful.
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> Piotrek
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>> [1]
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>
>>>>>> > > >>
>>>>>> > >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
>>>>>> > > >>>>>>>>>>>> <
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>
>>>>>> > > >>
>>>>>> > >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen <
>>>>>> se...@apache.org>
>>>>>> > > >> wrote:
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> I think this sounds reasonable.
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> Let's keep the "reconfiguration without stopping
>>>>>> the job"
>>>>>> > out
>>>>>> > > >> of
>>>>>> > > >>>>>>> this,
>>>>>> > > >>>>>>>>>>>>> because that would be a super big effort and if we
>>>>>> approach
>>>>>> > > >> that,
>>>>>> > > >>>>>>> then
>>>>>> > > >>>>>>>>>>>> in
>>>>>> > > >>>>>>>>>>>>> more generic way rather than specific to connector
>>>>>> metrics.
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> I would suggest to look at the following things
>>>>>> before
>>>>>> > > starting
>>>>>> > > >>>>> with
>>>>>> > > >>>>>>>>> any
>>>>>> > > >>>>>>>>>>>>> implementation work:
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> - Try and find a committer to support this,
>>>>>> otherwise it
>>>>>> > will
>>>>>> > > >> be
>>>>>> > > >>>>>>> hard
>>>>>> > > >>>>>>>>>>>> to
>>>>>> > > >>>>>>>>>>>>> make progress
>>>>>> > > >>>>>>>>>>>>> - Start with defining a smaller set of "core
>>>>>> metrics" and
>>>>>> > > >> extend
>>>>>> > > >>>>> the
>>>>>> > > >>>>>>>>>>>> set
>>>>>> > > >>>>>>>>>>>>> later. I think that is easier than now blocking on
>>>>>> reaching
>>>>>> > > >>>>>>> consensus
>>>>>> > > >>>>>>>>>>>> on a
>>>>>> > > >>>>>>>>>>>>> large group of metrics.
>>>>>> > > >>>>>>>>>>>>> - Find a solution to the problem Chesnay mentioned,
>>>>>> that
>>>>>> > the
>>>>>> > > >>>>>>> "records
>>>>>> > > >>>>>>>>>>>> in"
>>>>>> > > >>>>>>>>>>>>> metric is somehow overloaded and exists already in
>>>>>> the IO
>>>>>> > > >> Metric
>>>>>> > > >>>>>>>>> group.
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin <
>>>>>> > > >> becket....@gmail.com
>>>>>> > > >>>>>>
>>>>>> > > >>>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> Hi Stephan,
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> Thanks a lot for the feedback. All makes sense.
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> It is a good suggestion to simply have an
>>>>>> > onRecord(numBytes,
>>>>>> > > >>>>>>>>> eventTime)
>>>>>> > > >>>>>>>>>>>>>> method for connector writers. It should meet most
>>>>>> of the
>>>>>> > > >>>>>>>>> requirements,
>>>>>> > > >>>>>>>>>>>>>> individual
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> The configurable metrics feature is something
>>>>>> really
>>>>>> > useful,
>>>>>> > > >>>>>>>>>>>> especially if
>>>>>> > > >>>>>>>>>>>>>> we can somehow make it dynamically configurable
>>>>>> without
>>>>>> > > >> stopping
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>>> jobs.
>>>>>> > > >>>>>>>>>>>>>> It might be better to make it a separate discussion
>>>>>> > because
>>>>>> > > it
>>>>>> > > >>>>> is a
>>>>>> > > >>>>>>>>>>>> more
>>>>>> > > >>>>>>>>>>>>>> generic feature instead of only for connectors.
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> So in order to make some progress, in this FLIP we
>>>>>> can
>>>>>> > limit
>>>>>> > > >> the
>>>>>> > > >>>>>>>>>>>> discussion
>>>>>> > > >>>>>>>>>>>>>> scope to the connector related items:
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> - the standard connector metric names and types.
>>>>>> > > >>>>>>>>>>>>>> - the abstract ConnectorMetricHandler interface
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> I'll start a separate thread to discuss other
>>>>>> general
>>>>>> > metric
>>>>>> > > >>>>>>> related
>>>>>> > > >>>>>>>>>>>>>> enhancement items including:
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> - optional metrics
>>>>>> > > >>>>>>>>>>>>>> - dynamic metric configuration
>>>>>> > > >>>>>>>>>>>>>> - potential combination with rate limiter
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> Does this plan sound reasonable?
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen <
>>>>>> > > >> se...@apache.org>
>>>>>> > > >>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> Ignoring for a moment implementation details, this
>>>>>> > > connector
>>>>>> > > >>>>>>> metrics
>>>>>> > > >>>>>>>>>>>> work
>>>>>> > > >>>>>>>>>>>>>>> is a really good thing to do, in my opinion
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> The questions "oh, my job seems to be doing
>>>>>> nothing, I am
>>>>>> > > >>>>> looking
>>>>>> > > >>>>>>> at
>>>>>> > > >>>>>>>>>>>> the
>>>>>> > > >>>>>>>>>>>>>> UI
>>>>>> > > >>>>>>>>>>>>>>> and the 'records in' value is still zero" is in
>>>>>> the top
>>>>>> > > three
>>>>>> > > >>>>>>>>> support
>>>>>> > > >>>>>>>>>>>>>>> questions I have been asked personally.
>>>>>> > > >>>>>>>>>>>>>>> Introspection into "how far is the consumer
>>>>>> lagging
>>>>>> > behind"
>>>>>> > > >>>>> (event
>>>>>> > > >>>>>>>>>>>> time
>>>>>> > > >>>>>>>>>>>>>>> fetch latency) came up many times as well.
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> So big +1 to solving this problem.
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> About the exact design - I would try to go for the
>>>>>> > > following
>>>>>> > > >>>>>>>>>>>> properties:
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> - keep complexity of of connectors. Ideally the
>>>>>> metrics
>>>>>> > > >> handler
>>>>>> > > >>>>>>> has
>>>>>> > > >>>>>>>>> a
>>>>>> > > >>>>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or
>>>>>> so, and
>>>>>> > > >>>>> everything
>>>>>> > > >>>>>>>>>>>> else is
>>>>>> > > >>>>>>>>>>>>>>> internal to the handler. That makes it dead
>>>>>> simple for
>>>>>> > the
>>>>>> > > >>>>>>>>> connector.
>>>>>> > > >>>>>>>>>>>> We
>>>>>> > > >>>>>>>>>>>>>>> can also think of an extensive scheme for
>>>>>> connector
>>>>>> > > specific
>>>>>> > > >>>>>>>>> metrics.
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> - make it configurable on the job it cluster
>>>>>> level which
>>>>>> > > >>>>> metrics
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>> handler internally creates when that method is
>>>>>> invoked.
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> What do you think?
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> Best,
>>>>>> > > >>>>>>>>>>>>>>> Stephan
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler
>>>>>> <
>>>>>> > > >>>>>>>>> ches...@apache.org
>>>>>> > > >>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>> As I said before, I believe this to be
>>>>>> over-engineered
>>>>>> > and
>>>>>> > > >>>>> have
>>>>>> > > >>>>>>> no
>>>>>> > > >>>>>>>>>>>>>>>> interest in this implementation.
>>>>>> > > >>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>> There are conceptual issues like defining a
>>>>>> duplicate
>>>>>> > > >>>>>>>>>>>>>> numBytesIn(PerSec)
>>>>>> > > >>>>>>>>>>>>>>>> metric that already exists for each operator.
>>>>>> > > >>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote:
>>>>>> > > >>>>>>>>>>>>>>>>> A few updates to the thread. I uploaded a
>>>>>> patch[1] as a
>>>>>> > > >>>>> complete
>>>>>> > > >>>>>>>>>>>>>>>>> example of how users can use the metrics in the
>>>>>> future.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Some thoughts below after taking a look at the
>>>>>> > > >>>>>>> AbstractMetricGroup
>>>>>> > > >>>>>>>>>>>>>> and
>>>>>> > > >>>>>>>>>>>>>>>>> its subclasses.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> This patch intends to provide convenience for
>>>>>> Flink
>>>>>> > > >> connector
>>>>>> > > >>>>>>>>>>>>>>>>> implementations to follow metrics standards
>>>>>> proposed in
>>>>>> > > >>>>> FLIP-33.
>>>>>> > > >>>>>>>>> It
>>>>>> > > >>>>>>>>>>>>>>>>> also try to enhance the metric management in
>>>>>> general
>>>>>> > way
>>>>>> > > to
>>>>>> > > >>>>> help
>>>>>> > > >>>>>>>>>>>>>> users
>>>>>> > > >>>>>>>>>>>>>>>>> with:
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> 1. metric definition
>>>>>> > > >>>>>>>>>>>>>>>>> 2. metric dependencies check
>>>>>> > > >>>>>>>>>>>>>>>>> 3. metric validation
>>>>>> > > >>>>>>>>>>>>>>>>> 4. metric control (turn on / off particular
>>>>>> metrics)
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the
>>>>>> > > functionality
>>>>>> > > >> of
>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The
>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the
>>>>>> metric group
>>>>>> > > >>>>>>> hierarchy,
>>>>>> > > >>>>>>>>>>>> but
>>>>>> > > >>>>>>>>>>>>>>>>> does not really manage the metrics other than
>>>>>> keeping
>>>>>> > > them
>>>>>> > > >>>>> in a
>>>>>> > > >>>>>>>>> Map.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Ideally we should only have one entry point for
>>>>>> the
>>>>>> > > >> metrics.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Right now the entry point is
>>>>>> |AbstractMetricGroup|.
>>>>>> > > >> However,
>>>>>> > > >>>>>>>>> besides
>>>>>> > > >>>>>>>>>>>>>>>>> the missing functionality mentioned above,
>>>>>> > > >>>>> |AbstractMetricGroup|
>>>>>> > > >>>>>>>>>>>>>> seems
>>>>>> > > >>>>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could
>>>>>> extract it out
>>>>>> > > to
>>>>>> > > >>>>>>>>>>>>>>>>> flink-metrics in order to use it for generic
>>>>>> purpose.
>>>>>> > > There
>>>>>> > > >>>>> will
>>>>>> > > >>>>>>>>> be
>>>>>> > > >>>>>>>>>>>>>>>>> some work, though.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics|
>>>>>> in this
>>>>>> > > patch
>>>>>> > > >>>>> as
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>> metric entry point. It wraps metric group and
>>>>>> provides
>>>>>> > > the
>>>>>> > > >>>>>>> missing
>>>>>> > > >>>>>>>>>>>>>>>>> functionalities. Then we can roll out this
>>>>>> pattern to
>>>>>> > > >> runtime
>>>>>> > > >>>>>>>>>>>>>>>>> components gradually as well.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> My first thought is that the latter approach
>>>>>> gives a
>>>>>> > more
>>>>>> > > >>>>> smooth
>>>>>> > > >>>>>>>>>>>>>>>>> migration. But I am also OK with doing a
>>>>>> refactoring on
>>>>>> > > the
>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| family.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin <
>>>>>> > > >>>>>>> becket....@gmail.com
>>>>>> > > >>>>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Hi Chesnay,
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> It might be easier to discuss some
>>>>>> implementation
>>>>>> > details
>>>>>> > > >> in
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>> PR review instead of in the FLIP discussion
>>>>>> thread. I
>>>>>> > > have
>>>>>> > > >> a
>>>>>> > > >>>>>>>>>>>>>> patch
>>>>>> > > >>>>>>>>>>>>>>>>> for Kafka connectors ready but haven't
>>>>>> submitted the PR
>>>>>> > > >> yet.
>>>>>> > > >>>>>>>>>>>>>>>>> Hopefully that will help explain a bit more.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> ** Re: metric type binding
>>>>>> > > >>>>>>>>>>>>>>>>> This is a valid point that worths discussing.
>>>>>> If I
>>>>>> > > >> understand
>>>>>> > > >>>>>>>>>>>>>>>>> correctly, there are two points:
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> 1. Metric type / interface does not matter as
>>>>>> long as
>>>>>> > the
>>>>>> > > >>>>>>> metric
>>>>>> > > >>>>>>>>>>>>>>>>> semantic is clearly defined.
>>>>>> > > >>>>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as
>>>>>> the
>>>>>> > metric
>>>>>> > > >>>>>>>>>>>>>> semantic
>>>>>> > > >>>>>>>>>>>>>>>>> is defined, metric type does not matter. To some
>>>>>> > extent,
>>>>>> > > >>>>> Gauge
>>>>>> > > >>>>>>> /
>>>>>> > > >>>>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be
>>>>>> think of
>>>>>> > as
>>>>>> > > >>>>> some
>>>>>> > > >>>>>>>>>>>>>>>>> well-recognized semantics, if you wish. In
>>>>>> Flink, these
>>>>>> > > >>>>> metric
>>>>>> > > >>>>>>>>>>>>>>>>> semantics have their associated interface
>>>>>> classes. In
>>>>>> > > >>>>> practice,
>>>>>> > > >>>>>>>>>>>>>>>>> such semantic to interface binding seems
>>>>>> necessary for
>>>>>> > > >>>>>>> different
>>>>>> > > >>>>>>>>>>>>>>>>> components to communicate.  Simply standardize
>>>>>> the
>>>>>> > > semantic
>>>>>> > > >>>>> of
>>>>>> > > >>>>>>>>>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>> connector metrics seems not sufficient for
>>>>>> people to
>>>>>> > > build
>>>>>> > > >>>>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we
>>>>>> still
>>>>>> > need
>>>>>> > > >> to
>>>>>> > > >>>>>>>>> have
>>>>>> > > >>>>>>>>>>>>>>>>> some embodiment of the metric semantics that
>>>>>> people can
>>>>>> > > >>>>> program
>>>>>> > > >>>>>>>>>>>>>>>>> against.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be
>>>>>> exposed
>>>>>> > > using
>>>>>> > > >>>>>>>>>>>>>>>>> different metric types / interfaces.
>>>>>> > > >>>>>>>>>>>>>>>>> This is a good point. Counter and
>>>>>> Gauge-as-a-Counter
>>>>>> > are
>>>>>> > > >>>>> pretty
>>>>>> > > >>>>>>>>>>>>>>>>> much interchangeable. This is more of a
>>>>>> trade-off
>>>>>> > between
>>>>>> > > >> the
>>>>>> > > >>>>>>>>>>>>>> user
>>>>>> > > >>>>>>>>>>>>>>>>> experience of metric producers and consumers.
>>>>>> The
>>>>>> > metric
>>>>>> > > >>>>>>>>>>>>>> producers
>>>>>> > > >>>>>>>>>>>>>>>>> want to use Counter or Gauge depending on
>>>>>> whether the
>>>>>> > > >> counter
>>>>>> > > >>>>>>> is
>>>>>> > > >>>>>>>>>>>>>>>>> already tracked in code, while ideally the
>>>>>> metric
>>>>>> > > consumers
>>>>>> > > >>>>>>> only
>>>>>> > > >>>>>>>>>>>>>>>>> want to see a single metric type for each
>>>>>> metric. I am
>>>>>> > > >>>>> leaning
>>>>>> > > >>>>>>>>>>>>>>>>> towards to make the metric producers happy,
>>>>>> i.e. allow
>>>>>> > > >> Gauge
>>>>>> > > >>>>> /
>>>>>> > > >>>>>>>>>>>>>>>>> Counter metric type, and the the metric
>>>>>> consumers
>>>>>> > handle
>>>>>> > > >> the
>>>>>> > > >>>>>>>>> type
>>>>>> > > >>>>>>>>>>>>>>>>> variation. The reason is that in practice,
>>>>>> there might
>>>>>> > be
>>>>>> > > >>>>> more
>>>>>> > > >>>>>>>>>>>>>>>>> connector implementations than metric reporter
>>>>>> > > >>>>> implementations.
>>>>>> > > >>>>>>>>>>>>>> We
>>>>>> > > >>>>>>>>>>>>>>>>> could also provide some helper method to
>>>>>> facilitate
>>>>>> > > reading
>>>>>> > > >>>>>>> from
>>>>>> > > >>>>>>>>>>>>>>>>> such variable metric type.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Just some quick replies to the comments around
>>>>>> > > >> implementation
>>>>>> > > >>>>>>>>>>>>>>>> details.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   4) single place where metrics are registered
>>>>>> except
>>>>>> > > >>>>>>>>>>>>>>>>>   connector-specific
>>>>>> > > >>>>>>>>>>>>>>>>>   ones (which we can't really avoid).
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Register connector specific ones in a single
>>>>>> place is
>>>>>> > > >>>>> actually
>>>>>> > > >>>>>>>>>>>>>>>>> something that I want to achieve.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   2) I'm talking about time-series databases
>>>>>> like
>>>>>> > > >>>>> Prometheus.
>>>>>> > > >>>>>>>>>>>>>> We
>>>>>> > > >>>>>>>>>>>>>>>>>   would
>>>>>> > > >>>>>>>>>>>>>>>>>   only have a gauge metric exposing the last
>>>>>> > > >>>>>>>>> fetchTime/emitTime
>>>>>> > > >>>>>>>>>>>>>>>>>   that is
>>>>>> > > >>>>>>>>>>>>>>>>>   regularly reported to the backend
>>>>>> (Prometheus),
>>>>>> > where a
>>>>>> > > >>>>>>> user
>>>>>> > > >>>>>>>>>>>>>>>>>   could build
>>>>>> > > >>>>>>>>>>>>>>>>>   a histogram of his choosing when/if he wants
>>>>>> it.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Not sure if such downsampling works. As an
>>>>>> example, if
>>>>>> > a
>>>>>> > > >> user
>>>>>> > > >>>>>>>>>>>>>>>>> complains that there are some intermittent
>>>>>> latency
>>>>>> > spikes
>>>>>> > > >>>>>>> (maybe
>>>>>> > > >>>>>>>>>>>>>> a
>>>>>> > > >>>>>>>>>>>>>>>>> few records in 10 seconds) in their processing
>>>>>> system.
>>>>>> > > >>>>> Having a
>>>>>> > > >>>>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems
>>>>>> unlikely
>>>>>> > > useful.
>>>>>> > > >>>>>>>>>>>>>>>>> However by looking at actual 99.9 percentile
>>>>>> latency
>>>>>> > > might
>>>>>> > > >>>>>>> help.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler
>>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org
>>>>>> >>
>>>>>> > wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   Re: over complication of implementation.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   I think I get understand better know what
>>>>>> you're
>>>>>> > > >> shooting
>>>>>> > > >>>>>>>>>>>>>> for,
>>>>>> > > >>>>>>>>>>>>>>>>>   effectively something like the
>>>>>> OperatorIOMetricGroup.
>>>>>> > > >>>>>>>>>>>>>>>>>   But still, re-define setupConnectorMetrics()
>>>>>> to
>>>>>> > accept
>>>>>> > > a
>>>>>> > > >>>>>>> set
>>>>>> > > >>>>>>>>>>>>>>>>>   of flags
>>>>>> > > >>>>>>>>>>>>>>>>>   for counters/meters(ans _possibly_
>>>>>> histograms) along
>>>>>> > > >>>>> with a
>>>>>> > > >>>>>>>>>>>>>>>>>   set of
>>>>>> > > >>>>>>>>>>>>>>>>>   well-defined Optional<Gauge<?>>, and return
>>>>>> the
>>>>>> > group.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   Solves all issues as far as i can tell:
>>>>>> > > >>>>>>>>>>>>>>>>>   1) no metrics must be created manually (except
>>>>>> > Gauges,
>>>>>> > > >>>>>>> which
>>>>>> > > >>>>>>>>>>>>>>> are
>>>>>> > > >>>>>>>>>>>>>>>>>   effectively just Suppliers and you can't get
>>>>>> around
>>>>>> > > >>>>> this),
>>>>>> > > >>>>>>>>>>>>>>>>>   2) additional metrics can be registered on the
>>>>>> > returned
>>>>>> > > >>>>>>>>>>>>>> group,
>>>>>> > > >>>>>>>>>>>>>>>>>   3) see 1),
>>>>>> > > >>>>>>>>>>>>>>>>>   4) single place where metrics are registered
>>>>>> except
>>>>>> > > >>>>>>>>>>>>>>>>>   connector-specific
>>>>>> > > >>>>>>>>>>>>>>>>>   ones (which we can't really avoid).
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   Re: Histogram
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   1) As an example, whether "numRecordsIn" is
>>>>>> exposed
>>>>>> > as
>>>>>> > > a
>>>>>> > > >>>>>>>>>>>>>>>>>   Counter or a
>>>>>> > > >>>>>>>>>>>>>>>>>   Gauge should be irrelevant. So far we're
>>>>>> using the
>>>>>> > > >> metric
>>>>>> > > >>>>>>>>>>>>>> type
>>>>>> > > >>>>>>>>>>>>>>>>>   that is
>>>>>> > > >>>>>>>>>>>>>>>>>   the most convenient at exposing a given
>>>>>> value. If
>>>>>> > there
>>>>>> > > >>>>> is
>>>>>> > > >>>>>>>>>>>>>>>>>   some backing
>>>>>> > > >>>>>>>>>>>>>>>>>   data-structure that we want to expose some
>>>>>> data from
>>>>>> > we
>>>>>> > > >>>>>>>>>>>>>>>>>   typically opt
>>>>>> > > >>>>>>>>>>>>>>>>>   for a Gauge, as otherwise we're just mucking
>>>>>> around
>>>>>> > > with
>>>>>> > > >>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>>   Meter/Counter API to get it to match.
>>>>>> Similarly, if
>>>>>> > we
>>>>>> > > >>>>> want
>>>>>> > > >>>>>>>>>>>>>> to
>>>>>> > > >>>>>>>>>>>>>>>>>   count
>>>>>> > > >>>>>>>>>>>>>>>>>   something but no current count exists we
>>>>>> typically
>>>>>> > > added
>>>>>> > > >>>>> a
>>>>>> > > >>>>>>>>>>>>>>>>>   Counter.
>>>>>> > > >>>>>>>>>>>>>>>>>   That's why attaching semantics to metric
>>>>>> types makes
>>>>>> > > >>>>> little
>>>>>> > > >>>>>>>>>>>>>>>>>   sense (but
>>>>>> > > >>>>>>>>>>>>>>>>>   unfortunately several reporters already do
>>>>>> it); for
>>>>>> > > >>>>>>>>>>>>>>>>>   counters/meters
>>>>>> > > >>>>>>>>>>>>>>>>>   certainly, but the majority of metrics are
>>>>>> gauges.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   2) I'm talking about time-series databases
>>>>>> like
>>>>>> > > >>>>> Prometheus.
>>>>>> > > >>>>>>>>>>>>>> We
>>>>>> > > >>>>>>>>>>>>>>>>>   would
>>>>>> > > >>>>>>>>>>>>>>>>>   only have a gauge metric exposing the last
>>>>>> > > >>>>>>>>> fetchTime/emitTime
>>>>>> > > >>>>>>>>>>>>>>>>>   that is
>>>>>> > > >>>>>>>>>>>>>>>>>   regularly reported to the backend
>>>>>> (Prometheus),
>>>>>> > where a
>>>>>> > > >>>>>>> user
>>>>>> > > >>>>>>>>>>>>>>>>>   could build
>>>>>> > > >>>>>>>>>>>>>>>>>   a histogram of his choosing when/if he wants
>>>>>> it.
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>   On 22.02.2019 13:57, Becket Qin wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>> Hi Chesnay,
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the explanation.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: FLIP
>>>>>> > > >>>>>>>>>>>>>>>>>> I might have misunderstood this, but it seems
>>>>>> that
>>>>>> > > "major
>>>>>> > > >>>>>>>>>>>>>>>>>   changes" are well
>>>>>> > > >>>>>>>>>>>>>>>>>> defined in FLIP. The full contents is
>>>>>> following:
>>>>>> > > >>>>>>>>>>>>>>>>>> What is considered a "major change" that needs
>>>>>> a FLIP?
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Any of the following should be considered a
>>>>>> major
>>>>>> > > change:
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of
>>>>>> > > >>>>>>>>>>>>>>>>>   functionality
>>>>>> > > >>>>>>>>>>>>>>>>>> - *Any change that impacts the public
>>>>>> interfaces of
>>>>>> > the
>>>>>> > > >>>>>>>>>>>>>>>>>   project*
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> What are the "public interfaces" of the
>>>>>> project?
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> *All of the following are public interfaces
>>>>>> *that
>>>>>> > people
>>>>>> > > >>>>>>>>>>>>>>>>>   build around:
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - DataStream and DataSet API, including classes
>>>>>> > related
>>>>>> > > >>>>>>>>>>>>>>>>>   to that, such as
>>>>>> > > >>>>>>>>>>>>>>>>>> StreamExecutionEnvironment
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - Classes marked with the @Public annotation
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - On-disk binary formats, such as
>>>>>> > > >>>>>>>>>>>>>> checkpoints/savepoints
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e.
>>>>>> > > >>>>>>>>>>>>>>>>>   bin/flink, Yarn scripts,
>>>>>> > > >>>>>>>>>>>>>>>>>> Mesos scripts
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - Configuration settings
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> - *Exposed monitoring information*
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> So any monitoring information change is
>>>>>> considered as
>>>>>> > > >>>>>>>>>>>>>> public
>>>>>> > > >>>>>>>>>>>>>>>>>   interface, and
>>>>>> > > >>>>>>>>>>>>>>>>>> any public interface change is considered as a
>>>>>> "major
>>>>>> > > >>>>>>>>>>>>>>> change".
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: over complication of implementation.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Although this is more of implementation
>>>>>> details that
>>>>>> > is
>>>>>> > > >> not
>>>>>> > > >>>>>>>>>>>>>>>>>   covered by the
>>>>>> > > >>>>>>>>>>>>>>>>>> FLIP. But it may be worth discussing.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> First of all, I completely agree that we
>>>>>> should use
>>>>>> > the
>>>>>> > > >>>>>>>>>>>>>>>>>   simplest way to
>>>>>> > > >>>>>>>>>>>>>>>>>> achieve our goal. To me the goal is the
>>>>>> following:
>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces.
>>>>>> > > >>>>>>>>>>>>>>>>>> 2. The easiness of creating a connector.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Both of them are important to the prosperity
>>>>>> of the
>>>>>> > > >>>>>>>>>>>>>>>>>   connector ecosystem. So
>>>>>> > > >>>>>>>>>>>>>>>>>> I'd rather abstract as much as possible on our
>>>>>> side to
>>>>>> > > >> make
>>>>>> > > >>>>>>>>>>>>>>>>>   the connector
>>>>>> > > >>>>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a
>>>>>> static
>>>>>> > util
>>>>>> > > >>>>>>>>>>>>>>>>>   method approach
>>>>>> > > >>>>>>>>>>>>>>>>>> might have a few drawbacks:
>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Users still have to construct the metrics by
>>>>>> > > >> themselves.
>>>>>> > > >>>>>>>>>>>>>>>>>   (And note that
>>>>>> > > >>>>>>>>>>>>>>>>>> this might be erroneous by itself. For
>>>>>> example, a
>>>>>> > > customer
>>>>>> > > >>>>>>>>>>>>>>>>>   wrapper around
>>>>>> > > >>>>>>>>>>>>>>>>>> dropwizard meter maybe used instead of
>>>>>> MeterView).
>>>>>> > > >>>>>>>>>>>>>>>>>> 2. When connector specific metrics are added,
>>>>>> it is
>>>>>> > > >>>>>>>>>>>>>>>>>   difficult to enforce
>>>>>> > > >>>>>>>>>>>>>>>>>> the scope to be the same as standard metrics.
>>>>>> > > >>>>>>>>>>>>>>>>>> 3. It seems that a method proliferation is
>>>>>> inevitable
>>>>>> > if
>>>>>> > > >> we
>>>>>> > > >>>>>>>>>>>>>>>>>   want to apply
>>>>>> > > >>>>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn
>>>>>> was not
>>>>>> > > >>>>>>>>>>>>>>>>>   registered for a meter.
>>>>>> > > >>>>>>>>>>>>>>>>>> 4. Metrics are still defined in random places
>>>>>> and hard
>>>>>> > > to
>>>>>> > > >>>>>>>>>>>>>>>> track.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> The current PR I had was inspired by the
>>>>>> Config system
>>>>>> > > in
>>>>>> > > >>>>>>>>>>>>>>>>>   Kafka, which I
>>>>>> > > >>>>>>>>>>>>>>>>>> found pretty handy. In fact it is not only
>>>>>> used by
>>>>>> > Kafka
>>>>>> > > >>>>>>>>>>>>>>>>>   itself but even
>>>>>> > > >>>>>>>>>>>>>>>>>> some other projects that depend on Kafka. I am
>>>>>> not
>>>>>> > > saying
>>>>>> > > >>>>>>>>>>>>>>>>>   this approach is
>>>>>> > > >>>>>>>>>>>>>>>>>> perfect. But I think it worths to save the
>>>>>> work for
>>>>>> > > >>>>>>>>>>>>>>>>>   connector writers and
>>>>>> > > >>>>>>>>>>>>>>>>>> encourage more systematic implementation. That
>>>>>> being
>>>>>> > > said,
>>>>>> > > >>>>>>>>>>>>>> I
>>>>>> > > >>>>>>>>>>>>>>>>>   am fully open
>>>>>> > > >>>>>>>>>>>>>>>>>> to suggestions.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Re: Histogram
>>>>>> > > >>>>>>>>>>>>>>>>>> I think there are two orthogonal questions
>>>>>> around
>>>>>> > those
>>>>>> > > >>>>>>>>>>>>>>>> metrics:
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just
>>>>>> looking at
>>>>>> > the
>>>>>> > > >>>>>>>>>>>>>>>>>   meaning of a
>>>>>> > > >>>>>>>>>>>>>>>>>> metric, is generic to all connectors? If the
>>>>>> answer is
>>>>>> > > >> yes,
>>>>>> > > >>>>>>>>>>>>>>>>>   we should
>>>>>> > > >>>>>>>>>>>>>>>>>> include the metric into the convention. No
>>>>>> matter
>>>>>> > > whether
>>>>>> > > >>>>>>>>>>>>>> we
>>>>>> > > >>>>>>>>>>>>>>>>>   include it
>>>>>> > > >>>>>>>>>>>>>>>>>> into the convention or not, some connector
>>>>>> > > implementations
>>>>>> > > >>>>>>>>>>>>>>>>>   will emit such
>>>>>> > > >>>>>>>>>>>>>>>>>> metric. It is better to have a convention than
>>>>>> letting
>>>>>> > > >> each
>>>>>> > > >>>>>>>>>>>>>>>>>   connector do
>>>>>> > > >>>>>>>>>>>>>>>>>> random things.
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what
>>>>>> should we
>>>>>> > > do?
>>>>>> > > >>>>>>>>>>>>>>>>>> I agree that we should make it clear that using
>>>>>> > > histograms
>>>>>> > > >>>>>>>>>>>>>>>>>   will have
>>>>>> > > >>>>>>>>>>>>>>>>>> performance risk. But I do see histogram is
>>>>>> useful in
>>>>>> > > some
>>>>>> > > >>>>>>>>>>>>>>>>>   fine-granularity
>>>>>> > > >>>>>>>>>>>>>>>>>> debugging where one do not have the luxury to
>>>>>> stop the
>>>>>> > > >>>>>>>>>>>>>>>>>   system and inject
>>>>>> > > >>>>>>>>>>>>>>>>>> more inspection code. So the workaround I am
>>>>>> thinking
>>>>>> > is
>>>>>> > > >> to
>>>>>> > > >>>>>>>>>>>>>>>>>   provide some
>>>>>> > > >>>>>>>>>>>>>>>>>> implementation suggestions. Assume later on we
>>>>>> have a
>>>>>> > > >>>>>>>>>>>>>>>>>   mechanism of
>>>>>> > > >>>>>>>>>>>>>>>>>> selective metrics. In the abstract metrics
>>>>>> class we
>>>>>> > can
>>>>>> > > >>>>>>>>>>>>>>>>>   disable those
>>>>>> > > >>>>>>>>>>>>>>>>>> metrics by default individual connector
>>>>>> writers does
>>>>>> > not
>>>>>> > > >>>>>>>>>>>>>>>>>   have to do
>>>>>> > > >>>>>>>>>>>>>>>>>> anything (this is another advantage of having
>>>>>> an
>>>>>> > > >>>>>>>>>>>>>>>>>   AbstractMetrics instead of
>>>>>> > > >>>>>>>>>>>>>>>>>> static util methods.)
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> I am not sure I fully understand the histogram
>>>>>> in the
>>>>>> > > >>>>>>>>>>>>>>>>>   backend approach. Can
>>>>>> > > >>>>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting
>>>>>> the raw
>>>>>> > > data,
>>>>>> > > >>>>>>>>>>>>>>>>>   e.g. fetchTime
>>>>>> > > >>>>>>>>>>>>>>>>>> and emitTime with each record and let the
>>>>>> histogram
>>>>>> > > >>>>>>>>>>>>>>>>>   computation happen in
>>>>>> > > >>>>>>>>>>>>>>>>>> the background? Or let the processing thread
>>>>>> putting
>>>>>> > the
>>>>>> > > >>>>>>>>>>>>>>>>>   values into a
>>>>>> > > >>>>>>>>>>>>>>>>>> queue and have a separate thread polling from
>>>>>> the
>>>>>> > queue
>>>>>> > > >> and
>>>>>> > > >>>>>>>>>>>>>>>>>   add them into
>>>>>> > > >>>>>>>>>>>>>>>>>> the histogram?
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay
>>>>>> Schepler
>>>>>> > > >>>>>>>>>>>>>>>>>   <ches...@apache.org <mailto:
>>>>>> ches...@apache.org>>
>>>>>> > > wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Flip
>>>>>> > > >>>>>>>>>>>>>>>>>>> The very first line under both the main
>>>>>> header and
>>>>>> > > >> Purpose
>>>>>> > > >>>>>>>>>>>>>>>>>   section
>>>>>> > > >>>>>>>>>>>>>>>>>>> describe Flips as "major changes", which this
>>>>>> isn't.
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: complication
>>>>>> > > >>>>>>>>>>>>>>>>>>> I'm not arguing against standardization, but
>>>>>> again an
>>>>>> > > >>>>>>>>>>>>>>>>>   over-complicated
>>>>>> > > >>>>>>>>>>>>>>>>>>> implementation when a static utility method
>>>>>> would be
>>>>>> > > >>>>>>>>>>>>>>>>>   sufficient.
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> public static void setupConnectorMetrics(
>>>>>> > > >>>>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup,
>>>>>> > > >>>>>>>>>>>>>>>>>>> String connectorName,
>>>>>> > > >>>>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn,
>>>>>> > > >>>>>>>>>>>>>>>>>>> ...)
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> This gives you all you need:
>>>>>> > > >>>>>>>>>>>>>>>>>>> * a well-defined set of metrics for a
>>>>>> connector to
>>>>>> > > opt-in
>>>>>> > > >>>>>>>>>>>>>>>>>>> * standardized naming schemes for scope and
>>>>>> > individual
>>>>>> > > >>>>>>>>>>>>>>> metrics
>>>>>> > > >>>>>>>>>>>>>>>>>>> * standardize metric types (although
>>>>>> personally I'm
>>>>>> > not
>>>>>> > > >>>>>>>>>>>>>>>>>   interested in that
>>>>>> > > >>>>>>>>>>>>>>>>>>> since metric types should be considered
>>>>>> syntactic
>>>>>> > > sugar)
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Configurable Histogram
>>>>>> > > >>>>>>>>>>>>>>>>>>> If anything they _must_ be turned off by
>>>>>> default, but
>>>>>> > > the
>>>>>> > > >>>>>>>>>>>>>>>>>   metric system is
>>>>>> > > >>>>>>>>>>>>>>>>>>> already exposing so many options that I'm not
>>>>>> too
>>>>>> > keen
>>>>>> > > on
>>>>>> > > >>>>>>>>>>>>>>>>>   adding even more.
>>>>>> > > >>>>>>>>>>>>>>>>>>> You have also only addressed my first argument
>>>>>> > against
>>>>>> > > >>>>>>>>>>>>>>>>>   histograms
>>>>>> > > >>>>>>>>>>>>>>>>>>> (performance), the second one still stands
>>>>>> (calculate
>>>>>> > > >>>>>>>>>>>>>>>>>   histogram in metric
>>>>>> > > >>>>>>>>>>>>>>>>>>> backends instead).
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Chesnay,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is
>>>>>> worthy of a
>>>>>> > > >> FLIP
>>>>>> > > >>>>>>>>>>>>>>>>>   because it is
>>>>>> > > >>>>>>>>>>>>>>>>>>>> public API. According to the FLIP
>>>>>> description a FlIP
>>>>>> > > is
>>>>>> > > >>>>>>>>>>>>>>>>>   required in case
>>>>>> > > >>>>>>>>>>>>>>>>>>> of:
>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Any change that impacts the public
>>>>>> interfaces of
>>>>>> > > >>>>>>>>>>>>>>>>>   the project
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> and the following entry is found in the
>>>>>> definition
>>>>>> > of
>>>>>> > > >>>>>>>>>>>>>>>>>   "public interface".
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Exposed monitoring information
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Metrics are critical to any production
>>>>>> system. So a
>>>>>> > > >> clear
>>>>>> > > >>>>>>>>>>>>>>>>>   metric
>>>>>> > > >>>>>>>>>>>>>>>>>>> definition
>>>>>> > > >>>>>>>>>>>>>>>>>>>> is important for any serious users. For an
>>>>>> > > organization
>>>>>> > > >>>>>>>>>>>>>>>>>   with large Flink
>>>>>> > > >>>>>>>>>>>>>>>>>>>> installation, change in metrics means great
>>>>>> amount
>>>>>> > of
>>>>>> > > >>>>>>>>>>>>>>>>>   work. So such
>>>>>> > > >>>>>>>>>>>>>>>>>>> changes
>>>>>> > > >>>>>>>>>>>>>>>>>>>> do need to be fully discussed and documented.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: Histogram.
>>>>>> > > >>>>>>>>>>>>>>>>>>>> We can discuss whether there is a better way
>>>>>> to
>>>>>> > expose
>>>>>> > > >>>>>>>>>>>>>>>>>   metrics that are
>>>>>> > > >>>>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark
>>>>>> on
>>>>>> > various
>>>>>> > > >>>>>>>>>>>>>>>>>   histogram
>>>>>> > > >>>>>>>>>>>>>>>>>>>> implementations also indicates that they are
>>>>>> > > >>>>>>>>>>>>>> significantly
>>>>>> > > >>>>>>>>>>>>>>>>>   slower than
>>>>>> > > >>>>>>>>>>>>>>>>>>>> other metric types. But I don't think that
>>>>>> means
>>>>>> > never
>>>>>> > > >>>>>>>>>>>>>> use
>>>>>> > > >>>>>>>>>>>>>>>>>   histogram, but
>>>>>> > > >>>>>>>>>>>>>>>>>>>> means use it with caution. For example, we
>>>>>> can
>>>>>> > suggest
>>>>>> > > >>>>>>>>>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>>>> implementations
>>>>>> > > >>>>>>>>>>>>>>>>>>>> to turn them off by default and only turn it
>>>>>> on for
>>>>>> > a
>>>>>> > > >>>>>>>>>>>>>>>>>   small amount of
>>>>>> > > >>>>>>>>>>>>>>>>>>> time
>>>>>> > > >>>>>>>>>>>>>>>>>>>> when performing some micro-debugging.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: complication:
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Connector conventions are essential for Flink
>>>>>> > > ecosystem.
>>>>>> > > >>>>>>>>>>>>>>>>>   Flink connectors
>>>>>> > > >>>>>>>>>>>>>>>>>>>> pool is probably the most important part of
>>>>>> Flink,
>>>>>> > > just
>>>>>> > > >>>>>>>>>>>>>>>>>   like any other
>>>>>> > > >>>>>>>>>>>>>>>>>>> data
>>>>>> > > >>>>>>>>>>>>>>>>>>>> system. Clear conventions of connectors will
>>>>>> help
>>>>>> > > build
>>>>>> > > >>>>>>>>>>>>>>>>>   Flink ecosystem
>>>>>> > > >>>>>>>>>>>>>>>>>>> in
>>>>>> > > >>>>>>>>>>>>>>>>>>>> a more organic way.
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Take the metrics convention as an example,
>>>>>> imagine
>>>>>> > > >>>>>>>>>>>>>> someone
>>>>>> > > >>>>>>>>>>>>>>>>>   has developed
>>>>>> > > >>>>>>>>>>>>>>>>>>> a
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Flink connector for System foo, and another
>>>>>> > developer
>>>>>> > > >> may
>>>>>> > > >>>>>>>>>>>>>>>>>   have developed
>>>>>> > > >>>>>>>>>>>>>>>>>>> a
>>>>>> > > >>>>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for
>>>>>> Flink which
>>>>>> > > >>>>>>>>>>>>>>>>>   analyzes the Flink
>>>>>> > > >>>>>>>>>>>>>>>>>>> job
>>>>>> > > >>>>>>>>>>>>>>>>>>>> performance based on metrics. With a clear
>>>>>> metric
>>>>>> > > >>>>>>>>>>>>>>>>>   convention, those two
>>>>>> > > >>>>>>>>>>>>>>>>>>>> projects could be developed independently.
>>>>>> Once
>>>>>> > users
>>>>>> > > >> put
>>>>>> > > >>>>>>>>>>>>>>>>>   them together,
>>>>>> > > >>>>>>>>>>>>>>>>>>>> it would work without additional
>>>>>> modifications. This
>>>>>> > > >>>>>>>>>>>>>>>>>   cannot be easily
>>>>>> > > >>>>>>>>>>>>>>>>>>>> achieved by just defining a few constants.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: selective metrics:
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate
>>>>>> thread.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> @Dawid
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency
>>>>>> > > >>>>>>>>>>>>>>>>>>>> The primary purpose of establish such a
>>>>>> convention
>>>>>> > is
>>>>>> > > to
>>>>>> > > >>>>>>>>>>>>>>>>>   help developers
>>>>>> > > >>>>>>>>>>>>>>>>>>>> write connectors in a more compatible way.
>>>>>> The
>>>>>> > > >> convention
>>>>>> > > >>>>>>>>>>>>>>>>>   is supposed to
>>>>>> > > >>>>>>>>>>>>>>>>>>> be
>>>>>> > > >>>>>>>>>>>>>>>>>>>> defined more proactively. So when look at the
>>>>>> > > >> convention,
>>>>>> > > >>>>>>>>>>>>>>>>>   it seems more
>>>>>> > > >>>>>>>>>>>>>>>>>>>> important to see if the concept is
>>>>>> applicable to
>>>>>> > > >>>>>>>>>>>>>>>>>   connectors in general.
>>>>>> > > >>>>>>>>>>>>>>>>>>> It
>>>>>> > > >>>>>>>>>>>>>>>>>>>> might be true so far only Kafka connector
>>>>>> reports
>>>>>> > > >>>>>>>>>>>>>> latency.
>>>>>> > > >>>>>>>>>>>>>>>>>   But there
>>>>>> > > >>>>>>>>>>>>>>>>>>> might
>>>>>> > > >>>>>>>>>>>>>>>>>>>> be hundreds of other connector
>>>>>> implementations in
>>>>>> > the
>>>>>> > > >>>>>>>>>>>>>>>>>   Flink ecosystem,
>>>>>> > > >>>>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of
>>>>>> them also
>>>>>> > > >> emits
>>>>>> > > >>>>>>>>>>>>>>>>>   latency. I
>>>>>> > > >>>>>>>>>>>>>>>>>>> think
>>>>>> > > >>>>>>>>>>>>>>>>>>>> a lot of other sources actually also has an
>>>>>> append
>>>>>> > > >>>>>>>>>>>>>>>>>   timestamp. e.g.
>>>>>> > > >>>>>>>>>>>>>>>>>>> database
>>>>>> > > >>>>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't
>>>>>> be
>>>>>> > > surprised
>>>>>> > > >>>>>>>>>>>>>>>>>   if some database
>>>>>> > > >>>>>>>>>>>>>>>>>>>> connector can also emit latency metrics.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay
>>>>>> Schepler
>>>>>> > > >>>>>>>>>>>>>>>>>   <ches...@apache.org <mailto:
>>>>>> ches...@apache.org>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to
>>>>>> investigate
>>>>>> > > this
>>>>>> > > >>>>>>>>>>>>>> as
>>>>>> > > >>>>>>>>>>>>>>>>>   part of this
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of
>>>>>> interest
>>>>>> > for
>>>>>> > > >>>>>>>>>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>>   entire metric
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> system, and should be designed for as such.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the
>>>>>> core of
>>>>>> > > >>>>>>>>>>>>>>>>>   Flink. They are
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> significantly more expensive than other
>>>>>> metrics,
>>>>>> > and
>>>>>> > > >>>>>>>>>>>>>>>>>   calculating
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> histograms in the application is regarded
>>>>>> as an
>>>>>> > > >>>>>>>>>>>>>>>>>   anti-pattern by several
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to
>>>>>> expose
>>>>>> > the
>>>>>> > > >> raw
>>>>>> > > >>>>>>>>>>>>>>>>>   data and
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> calculate the histogram in the backend.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Second, this seems overly complicated.
>>>>>> Given that
>>>>>> > we
>>>>>> > > >>>>>>>>>>>>>>>>>   already established
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> that not all connectors will export all
>>>>>> metrics we
>>>>>> > > are
>>>>>> > > >>>>>>>>>>>>>>>>>   effectively
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming
>>>>>> scheme.
>>>>>> > We
>>>>>> > > >>>>>>>>>>>>>>>>>   don't need anything
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a few
>>>>>> > > constants
>>>>>> > > >>>>>>>>>>>>>>>>>   that all
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connectors use.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a
>>>>>> FLIP.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree.
>>>>>> But
>>>>>> > those
>>>>>> > > >>>>>>>>>>>>>>>>>   particular metrics
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would
>>>>>> only be
>>>>>> > > >>>>>>>>>>>>>>>>>   reported if user uses
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal
>>>>>> timestampAssigner with
>>>>>> > > >>>>>>>>>>>>>>>>>   StreamCharacteristic
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like
>>>>>> a very
>>>>>> > > >>>>>>>>>>>>>>>>>   specific case. I am
>>>>>> > > >>>>>>>>>>>>>>>>>>> not
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic
>>>>>> metric that
>>>>>> > > will
>>>>>> > > >>>>>>>>>>>>>> be
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> disabled/absent for most of
>>>>>> implementations.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue,
>>>>>> that
>>>>>> > > might
>>>>>> > > >>>>>>>>>>>>>>>>>   make sense to
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> investigate in the future.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dawid,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense
>>>>>> to me.
>>>>>> > > >> There
>>>>>> > > >>>>>>>>>>>>>>>>>   are two cases
>>>>>> > > >>>>>>>>>>>>>>>>>>> to
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> be
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> addressed.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a
>>>>>> guidance. It
>>>>>> > is
>>>>>> > > >>>>>>>>>>>>>>>>>   likely that a
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> only supports some but not all of the
>>>>>> metrics. In
>>>>>> > > >> that
>>>>>> > > >>>>>>>>>>>>>>>>>   case, each
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> implementation should have the freedom to
>>>>>> decide
>>>>>> > > >> which
>>>>>> > > >>>>>>>>>>>>>>>>>   metrics are
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are
>>>>>> supported, the
>>>>>> > > >>>>>>>>>>>>>>>>>   guidance should be
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> followed.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable
>>>>>> certain
>>>>>> > > >> metrics
>>>>>> > > >>>>>>>>>>>>>>>>>   for some reason
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of
>>>>>> data). A
>>>>>> > > generic
>>>>>> > > >>>>>>>>>>>>>>>>>   mechanism should
>>>>>> > > >>>>>>>>>>>>>>>>>>> be
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> provided to allow user choose which
>>>>>> metrics are
>>>>>> > > >>>>>>>>>>>>>>>>>   reported. This
>>>>>> > > >>>>>>>>>>>>>>>>>>> mechanism
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> should also be honored by the connector
>>>>>> > > >>>>>>>>>>>>>> implementations.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you?
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid
>>>>>> Wysakowicz
>>>>>> > <
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto:
>>>>>> > > dwysakow...@apache.org
>>>>>> > > >>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a
>>>>>> unified,
>>>>>> > > >>>>>>>>>>>>>>>>>   standard set of
>>>>>> > > >>>>>>>>>>>>>>>>>>> metrics
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> for
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight
>>>>>> concerns
>>>>>> > about
>>>>>> > > >>>>>>>>>>>>>>>>>   fetchLatency and
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based
>>>>>> on
>>>>>> > > EventTime
>>>>>> > > >>>>>>>>>>>>>>>>>   which is not a
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> purely
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on
>>>>>> some
>>>>>> > > business
>>>>>> > > >>>>>>>>>>>>>>>>>   logic, might be
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> absent
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics
>>>>>> could
>>>>>> > also
>>>>>> > > >>>>>>>>>>>>>>>>>   behave in a weird
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> way in
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I
>>>>>> am not
>>>>>> > sure
>>>>>> > > >> if
>>>>>> > > >>>>>>>>>>>>>>>>>   we should
>>>>>> > > >>>>>>>>>>>>>>>>>>> include
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could
>>>>>> at
>>>>>> > least
>>>>>> > > >>>>>>>>>>>>>>>>>   introduce a feature
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think?
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the
>>>>>> proposed
>>>>>> > > >>>>>>>>>>>>>>>>>   metrics. I'll start a
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> voting thread later toady.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket
>>>>>> Qin
>>>>>> > > >>>>>>>>>>>>>>>>>   <becket....@gmail.com <mailto:
>>>>>> becket....@gmail.com>>
>>>>>> > <
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto:
>>>>>> becket....@gmail.com
>>>>>> > >>
>>>>>> > > >>>>>>>>>>>>>>> wrote:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi folks,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP
>>>>>> discussion thread
>>>>>> > > >>>>>>>>>>>>>> about
>>>>>> > > >>>>>>>>>>>>>>>>>   standardize
>>>>>> > > >>>>>>>>>>>>>>>>>>> the
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> connector metrics.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a
>>>>>> convention
>>>>>> > of
>>>>>> > > >>>>>>>>>>>>>>>>>   Flink connector
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the
>>>>>> monitoring
>>>>>> > and
>>>>>> > > >>>>>>>>>>>>>>>>>   alerting on Flink
>>>>>> > > >>>>>>>>>>>>>>>>>>>>> jobs.
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> The FLIP link is following:
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>
>>>>>> > > >>
>>>>>> > >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>>>
>>>>>> > > >>>>>
>>>>>> > > >>>>>
>>>>>> > > >>
>>>>>> > > >>
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Konstantin Knauf
>>>>>>
>>>>>> https://twitter.com/snntrable
>>>>>>
>>>>>> https://github.com/knaufk
>>>>>>
>>>>>

Reply via email to