Hi Becket, Thank you for picking up this FLIP. I have a few questions:
* two thoughts on naming: * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has been introduced in https://issues.apache.org/jira/browse/FLINK-16864. They have a similar name, but different definitions of idleness, e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is backpressured. Can we make it clearer that these two metrics mean different things? * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)" is more descriptive/clear. What do others think? * Current(Fetch)Latency implies that the timestamps are directly extracted in the source connector, right? Will this be the default for FLIP-27 sources anyway? * Does FLIP-33 also include the implementation of these metrics (to the extent possible) for all connectors currently available in Apache Flink or is the "per-connector implementation" out-of-scope? Thanks, Konstantin On Fri, Sep 4, 2020 at 4:56 PM Becket Qin <becket....@gmail.com> wrote: > Hi all, > > To complete the Source refactoring work, I'd like to revive this > discussion. Since the mail thread has been dormant for more than a year, > just to recap the motivation of the FLIP: > > 1. The FLIP proposes to standardize the connector metrics by giving > guidance on the metric specifications, including the name, type and meaning > of the metrics. > 2. It is OK for a connector to not emit some of the metrics in the metric > guidance, but if a metric of the same semantic is emitted, the > implementation should follow the guidance. > 3. It is OK for a connector to emit more metrics than what are listed in > the FLIP. This includes having an alias for a metric specified in the FLIP. > 4. We will implement some of the metrics out of the box in the default > implementation of FLIP-27, as long as it is applicable. > > The FLIP wiki is following: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33 > %3A+Standardize+Connector+Metrics > > Any thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski <pi...@ververica.com> > wrote: > > > > we will need to revisit the convention list and adjust them accordingly > > when FLIP-27 is ready > > > > > > Yes, this sounds good :) > > > > Piotrek > > > > > On 13 Jun 2019, at 13:35, Becket Qin <becket....@gmail.com> wrote: > > > > > > Hi Piotr, > > > > > > That's great to know. Chances are that we will need to revisit the > > > convention list and adjust them accordingly when FLIP-27 is ready, At > > that > > > point we can mark some of the metrics as available by default for > > > connectors implementing the new interface. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski <pi...@ververica.com> > > wrote: > > > > > >> Thanks for driving this. I’ve just noticed one small thing. With new > > >> SourceReader interface Flink will be able to provide `idleTime` metric > > >> automatically. > > >> > > >> Piotrek > > >> > > >>> On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com> wrote: > > >>> > > >>> Thanks all for the feedback and discussion. > > >>> > > >>> Since there wasn't any concern raised, I've started the voting thread > > for > > >>> this FLIP, but please feel free to continue the discussion here if > you > > >>> think something still needs to be addressed. > > >>> > > >>> Thanks, > > >>> > > >>> Jiangjie (Becket) Qin > > >>> > > >>> > > >>> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin <becket....@gmail.com> > > wrote: > > >>> > > >>>> Hi Piotr, > > >>>> > > >>>> Thanks for the comments. Yes, you are right. Users will have to look > > at > > >>>> other metrics to decide whether the pipeline is healthy or not in > the > > >> first > > >>>> place before they can use the time-based metric to fix the > bottleneck. > > >>>> > > >>>> I agree that once we have FLIP-27 ready, some of the metrics can > just > > be > > >>>> reported by the abstract implementation. > > >>>> > > >>>> I've updated FLIP-33 wiki page to add the pendingBytes and > > >> pendingRecords > > >>>> metric. Please let me know if you have any concern over the updated > > >> metric > > >>>> convention proposal. > > >>>> > > >>>> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen > > >>>> <step...@ververica.com> will you also have time to take a look at > the > > >>>> proposed metric convention? If there is no further concern I'll > start > > a > > >>>> voting thread for this FLIP in two days. > > >>>> > > >>>> Thanks, > > >>>> > > >>>> Jiangjie (Becket) Qin > > >>>> > > >>>> > > >>>> > > >>>> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski <pi...@ververica.com> > > >> wrote: > > >>>> > > >>>>> Hi Becket, > > >>>>> > > >>>>> Thanks for the answer :) > > >>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on > producing > > >> the > > >>>>>> record to downstream. For example, a source connector can report > > that > > >>>>> it's > > >>>>>> spending 80% of time to emit record to downstream processing > > pipeline. > > >>>>> In > > >>>>>> another case, a sink connector may report that its spending 30% of > > >> time > > >>>>>> producing the records to the external system. > > >>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage metric: > > >>>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> > > downstream > > >>>>>> node is bottleneck ---> output buffer is probably full. > > >>>>>> - 30% of time spent on emitting records to downstream ---> > > downstream > > >>>>>> node is not bottleneck ---> output buffer is probably not full. > > >>>>> > > >>>>> If by “time spent on emitting records to downstream” you understand > > >>>>> “waiting on back pressure”, then I see your point. And I agree that > > >> some > > >>>>> kind of ratio/time based metric gives you more information. However > > >> under > > >>>>> “time spent on emitting records to downstream” might be hidden the > > >>>>> following (extreme) situation: > > >>>>> > > >>>>> 1. Job is barely able to handle influx of records, there is 99% > > >>>>> CPU/resource usage in the cluster, but nobody is > > >>>>> bottlenecked/backpressured, all output buffers are empty, everybody > > is > > >>>>> waiting in 1% of it’s time for more records to process. > > >>>>> 2. 80% time can still be spent on "down stream operators”, because > > they > > >>>>> are the CPU intensive operations, but this doesn’t mean that > > >> increasing the > > >>>>> parallelism down the stream will help with anything there. To the > > >> contrary, > > >>>>> increasing parallelism of the source operator might help to > increase > > >>>>> resource utilisation up to 100%. > > >>>>> > > >>>>> However, this “time based/ratio” approach can be extended to > > in/output > > >>>>> buffer usage. Besides collecting an information that input/output > > >> buffer is > > >>>>> full/empty, we can probe profile how often are buffer empty/full. > If > > >> output > > >>>>> buffer is full 1% of times, there is almost no back pressure. If > it’s > > >> full > > >>>>> 80% of times, there is some back pressure, if it’s full 99.9% of > > times, > > >>>>> there is huge back pressure. > > >>>>> > > >>>>> Now for autoscaling you could compare the input & output buffers > fill > > >>>>> ratio: > > >>>>> > > >>>>> 1. Both are high, the source of bottleneck is down the stream > > >>>>> 2. Output is low, input is high, this is the bottleneck and the > > higher > > >>>>> the difference, the bigger source of bottleneck is this is > > >> operator/task > > >>>>> 3. Output is high, input is low - there was some load spike that we > > are > > >>>>> currently finishing to process > > >>>>> > > >>>>> > > >>>>> > > >>>>> But long story short, we are probably diverging from the topic of > > this > > >>>>> discussion, and we can discuss this at some later point. > > >>>>> > > >>>>> For now, for sources: > > >>>>> > > >>>>> as I wrote before, +1 for: > > >>>>> - pending.bytes, Gauge > > >>>>> - pending.messages, Gauge > > >>>>> > > >>>>> When we will be developing/discussing SourceReader from FLIP-27 we > > >> might > > >>>>> then add: > > >>>>> > > >>>>> - in-memory.buffer.usage (0 - 100%) > > >>>>> > > >>>>> Which will be estimated automatically by Flink while user will be > > able > > >> to > > >>>>> override/provide better estimation. > > >>>>> > > >>>>> Piotrek > > >>>>> > > >>>>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> wrote: > > >>>>>> > > >>>>>> Hi Piotr, > > >>>>>> > > >>>>>> Thanks for the explanation. Please see some clarifications below. > > >>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on > producing > > >> the > > >>>>>> record to downstream. For example, a source connector can report > > that > > >>>>> it's > > >>>>>> spending 80% of time to emit record to downstream processing > > pipeline. > > >>>>> In > > >>>>>> another case, a sink connector may report that its spending 30% of > > >> time > > >>>>>> producing the records to the external system. > > >>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage metric: > > >>>>>> - 80% of time spent on emitting records to downstream ---> > > downstream > > >>>>>> node is bottleneck ---> output buffer is probably full. > > >>>>>> - 30% of time spent on emitting records to downstream ---> > > downstream > > >>>>>> node is not bottleneck ---> output buffer is probably not full. > > >>>>>> > > >>>>>> However, the time-based metric has a few advantages that the > buffer > > >>>>> usage > > >>>>>> metric may not have. > > >>>>>> > > >>>>>> 1. Buffer usage metric may not be applicable to all the connector > > >>>>>> implementations, while reporting time-based metric are always > > doable. > > >>>>>> Some source connectors may not have any input buffer, or they may > > use > > >>>>> some > > >>>>>> third party library that does not expose the input buffer at all. > > >>>>>> Similarly, for sink connectors, the implementation may not have > any > > >>>>> output > > >>>>>> buffer, or the third party library does not expose such buffer. > > >>>>>> > > >>>>>> 2. Although both type of metrics can detect bottleneck, time-based > > >>>>> metrics > > >>>>>> can be used to generate a more informed action to remove the > > >> bottleneck. > > >>>>>> For example, when the downstream is bottleneck, the output buffer > > >> usage > > >>>>>> metric is likely to be 100%, and the input buffer usage might be > 0%. > > >>>>> That > > >>>>>> means we don't know what is the suitable parallelism to lift the > > >>>>>> bottleneck. The time-based metric, on the other hand, would give > > >> useful > > >>>>>> information, e.g. if 80% of time was spent on emitting records, we > > can > > >>>>>> roughly increase the downstream node parallelism by 4 times. > > >>>>>> > > >>>>>> Admittedly, the time-based metrics are more expensive than buffer > > >>>>> usage. So > > >>>>>> we may have to do some sampling to reduce the cost. But in > general, > > >>>>> using > > >>>>>> time-based metrics seems worth adding. > > >>>>>> > > >>>>>> That being said, I don't think buffer usage metric and time-based > > >>>>> metrics > > >>>>>> are mutually exclusive. We can probably have both. It is just that > > in > > >>>>>> practice, features like auto-scaling might prefer time-based > metrics > > >> for > > >>>>>> the reason stated above. > > >>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually detect > > >>>>> bottlenecks. > > >>>>>> As I wrote, we already have them in most of the places, except of > > >>>>>> sources/sinks. > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. > > Currently > > >> we > > >>>>>> are only automatically detecting back pressure and reporting it to > > the > > >>>>> user > > >>>>>> in web UI (is it exposed as a metric at all?). Detecting the root > > >> cause > > >>>>> of > > >>>>>> the back pressure (bottleneck) is one step further. > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is > located, > > >> to > > >>>>>> try to do something with it. > > >>>>>> > > >>>>>> As explained above, I think time-based metric also addresses item > 1 > > >> and > > >>>>>> item 2. > > >>>>>> > > >>>>>> Any thoughts? > > >>>>>> > > >>>>>> Thanks, > > >>>>>> > > >>>>>> Jiangjie (Becket) Qin > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski < > pi...@ververica.com> > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi again :) > > >>>>>>> > > >>>>>>>> - pending.bytes, Gauge > > >>>>>>>> - pending.messages, Gauge > > >>>>>>> > > >>>>>>> > > >>>>>>> +1 > > >>>>>>> > > >>>>>>> And true, instead of overloading one of the metric it is better > > when > > >>>>> user > > >>>>>>> can choose to provide only one of them. > > >>>>>>> > > >>>>>>> Re 2: > > >>>>>>> > > >>>>>>>> If I understand correctly, this metric along with the pending > > >> mesages > > >>>>> / > > >>>>>>>> bytes would answer the questions of: > > >>>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind + empty > > >>>>> buffer > > >>>>>>> = > > >>>>>>>> cannot consume fast enough. > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + full > > buffer > > >> = > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. > > >>>>>>> > > >>>>>>> Yes, exactly. This can also be used to support decisions like > > >> changing > > >>>>> the > > >>>>>>> parallelism of the sources and/or down stream operators. > > >>>>>>> > > >>>>>>> I’m not sure if I understand your proposal with time based > > >>>>> measurements. > > >>>>>>> Maybe I’m missing something, but I do not see how measuring time > > >> alone > > >>>>>>> could answer the problem: where is the bottleneck. Time spent on > > the > > >>>>>>> next/emit might be short or long (depending on how heavy to > process > > >> the > > >>>>>>> record is) and the source can still be bottlenecked/back > pressured > > or > > >>>>> not. > > >>>>>>> Usually the easiest and the most reliable way how to detect > > >>>>> bottlenecks is > > >>>>>>> by checking usage of input & output buffers, since when input > > buffer > > >> is > > >>>>>>> full while output buffer is empty, that’s the definition of a > > >>>>> bottleneck. > > >>>>>>> Also this is usually very easy and cheap to measure (it works > > >>>>> effectively > > >>>>>>> the same way as current’s Flink back pressure monitoring, but > more > > >>>>> cleanly, > > >>>>>>> without probing thread’s stack traces). > > >>>>>>> > > >>>>>>> Also keep in mind that we are already using the buffer usage > > metrics > > >>>>> for > > >>>>>>> detecting the bottlenecks in Flink’s internal network exchanges > > >> (manual > > >>>>>>> work). That’s the reason why I wanted to extend this to > > >> sources/sinks, > > >>>>>>> since they are currently our blind spot. > > >>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink > > automatically > > >>>>>>> relies > > >>>>>>>> on some metrics answering the same question > > >>>>>>> > > >>>>>>> That would be very helpful feature. I think in order to achieve > > that > > >> we > > >>>>>>> would need to: > > >>>>>>> 1. Define the metrics that would allow us to manually detect > > >>>>> bottlenecks. > > >>>>>>> As I wrote, we already have them in most of the places, except of > > >>>>>>> sources/sinks. > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. > > Currently > > >> we > > >>>>>>> are only automatically detecting back pressure and reporting it > to > > >> the > > >>>>> user > > >>>>>>> in web UI (is it exposed as a metric at all?). Detecting the root > > >>>>> cause of > > >>>>>>> the back pressure (bottleneck) is one step further. > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is > located, > > >> to > > >>>>> try > > >>>>>>> to do something with it. > > >>>>>>> > > >>>>>>> I think you are aiming for point 3., but before we reach it, we > are > > >>>>> still > > >>>>>>> missing 1. & 2. Also even if we have 3., there is a value in 1 & > 2 > > >> for > > >>>>>>> manual analysis/dashboards. > > >>>>>>> > > >>>>>>> However, having the knowledge of where the bottleneck is, doesn’t > > >>>>>>> necessarily mean that we know what we can do about it. For > example > > >>>>>>> increasing parallelism might or might not help with anything > (data > > >>>>> skew, > > >>>>>>> bottleneck on some resource that does not scale), but this remark > > >>>>> applies > > >>>>>>> always, regardless of the way how did we detect the bottleneck. > > >>>>>>> > > >>>>>>> Piotrek > > >>>>>>> > > >>>>>>>> On 3 Jun 2019, at 06:16, Becket Qin <becket....@gmail.com> > wrote: > > >>>>>>>> > > >>>>>>>> Hi Piotr, > > >>>>>>>> > > >>>>>>>> Thanks for the suggestion. Some thoughts below: > > >>>>>>>> > > >>>>>>>> Re 1: The pending messages / bytes. > > >>>>>>>> I completely agree these are very useful metrics and we should > > >> expect > > >>>>> the > > >>>>>>>> connector to report. WRT the way to expose them, it seems more > > >>>>> consistent > > >>>>>>>> to add two metrics instead of adding a method (unless there are > > >> other > > >>>>> use > > >>>>>>>> cases other than metric reporting). So we can add the following > > two > > >>>>>>> metrics. > > >>>>>>>> - pending.bytes, Gauge > > >>>>>>>> - pending.messages, Gauge > > >>>>>>>> Applicable connectors can choose to report them. These two > metrics > > >>>>> along > > >>>>>>>> with latency should be sufficient for users to understand the > > >> progress > > >>>>>>> of a > > >>>>>>>> connector. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Re 2: Number of buffered data in-memory of the connector > > >>>>>>>> If I understand correctly, this metric along with the pending > > >> mesages > > >>>>> / > > >>>>>>>> bytes would answer the questions of: > > >>>>>>>> - Does the connector consume fast enough? Lagging behind + empty > > >>>>> buffer > > >>>>>>> = > > >>>>>>>> cannot consume fast enough. > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + full > > buffer > > >> = > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. > > >>>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink > > automatically > > >>>>>>> relies > > >>>>>>>> on some metrics answering the same question, more specifically, > we > > >> are > > >>>>>>>> profiling the time spent on .next() method (time to consume) and > > the > > >>>>> time > > >>>>>>>> spent on .collect() method (time to emit / process). One > advantage > > >> of > > >>>>>>> such > > >>>>>>>> method level time cost allows us to calculate the parallelism > > >>>>> required to > > >>>>>>>> keep up in case their is a lag. > > >>>>>>>> > > >>>>>>>> However, one concern I have regarding such metric is that they > are > > >>>>>>>> implementation specific. Either profiling on the method time, or > > >>>>>>> reporting > > >>>>>>>> buffer usage assumes the connector are implemented in such a > way. > > A > > >>>>>>>> slightly better solution might be have the following metric: > > >>>>>>>> > > >>>>>>>> - EmitTimeRatio (or FetchTimeRatio): The time spent on emitting > > >>>>>>>> records / Total time elapsed. > > >>>>>>>> > > >>>>>>>> This assumes that the source connectors have to emit the records > > to > > >>>>> the > > >>>>>>>> downstream at some point. The emission may take some time ( e.g. > > go > > >>>>>>> through > > >>>>>>>> chained operators). And the rest of the time are spent to > prepare > > >> the > > >>>>>>>> record to emit, including time for consuming and format > > conversion, > > >>>>> etc. > > >>>>>>>> Ideally, we'd like to see the time spent on record fetch and > emit > > to > > >>>>> be > > >>>>>>>> about the same, so no one is bottleneck for the other. > > >>>>>>>> > > >>>>>>>> The downside of these time based metrics is additional overhead > to > > >> get > > >>>>>>> the > > >>>>>>>> time, therefore sampling might be needed. But in practice I feel > > >> such > > >>>>>>> time > > >>>>>>>> based metric might be more useful if we want to take action. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> I think we should absolutely add metrics in (1) to the metric > > >>>>> convention. > > >>>>>>>> We could also add the metrics mentioned in (2) if we reach > > consensus > > >>>>> on > > >>>>>>>> that. What do you think? > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> > > >>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski < > > pi...@ververica.com > > >>> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hey Becket, > > >>>>>>>>> > > >>>>>>>>> Re 1a) and 1b) +1 from my side. > > >>>>>>>>> > > >>>>>>>>> I’ve discussed this issue: > > >>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us to check > > the > > >>>>> cause > > >>>>>>>>> of > > >>>>>>>>>>>> back pressure: > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or boolean > > >>>>>>>>>>>> hasSomethingl/isEmpty) > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean > > >>>>>>>>>>>> hasSomething/isEmpty) > > >>>>>>>>> > > >>>>>>>>> With Nico at some lengths and he also saw the benefits of them. > > We > > >>>>> also > > >>>>>>>>> have more concrete proposal for that. > > >>>>>>>>> > > >>>>>>>>> Actually there are two really useful metrics, that we are > missing > > >>>>>>>>> currently: > > >>>>>>>>> > > >>>>>>>>> 1. Number of data/records/bytes in the backlog to process. For > > >>>>> example > > >>>>>>>>> remaining number of bytes in unread files. Or pending data in > > Kafka > > >>>>>>> topics. > > >>>>>>>>> 2. Number of buffered data in-memory of the connector, that are > > >>>>> waiting > > >>>>>>> to > > >>>>>>>>> be processed pushed to Flink pipeline. > > >>>>>>>>> > > >>>>>>>>> Re 1: > > >>>>>>>>> This would have to be a metric provided directly by a > connector. > > It > > >>>>>>> could > > >>>>>>>>> be an undefined `int`: > > >>>>>>>>> > > >>>>>>>>> `int backlog` - estimate of pending work. > > >>>>>>>>> > > >>>>>>>>> “Undefined” meaning that it would be up to a connector to > decided > > >>>>>>> whether > > >>>>>>>>> it’s measured in bytes, records, pending files or whatever it > is > > >>>>>>> possible > > >>>>>>>>> to provide by the connector. This is because I assume not every > > >>>>>>> connector > > >>>>>>>>> can provide exact number and for some of them it might be > > >> impossible > > >>>>> to > > >>>>>>>>> provide records number of bytes count. > > >>>>>>>>> > > >>>>>>>>> Re 2: > > >>>>>>>>> This metric could be either provided by a connector, or > > calculated > > >>>>>>> crudely > > >>>>>>>>> by Flink: > > >>>>>>>>> > > >>>>>>>>> `float bufferUsage` - value from [0.0, 1.0] range > > >>>>>>>>> > > >>>>>>>>> Percentage of used in memory buffers, like in Kafka’s handover. > > >>>>>>>>> > > >>>>>>>>> It could be crudely implemented by Flink with FLIP-27 > > >>>>>>>>> SourceReader#isAvailable. If SourceReader is not available > > reported > > >>>>>>>>> `bufferUsage` could be 0.0. If it is available, it could be > 1.0. > > I > > >>>>> think > > >>>>>>>>> this would be a good enough estimation for most of the use > cases > > >>>>> (that > > >>>>>>>>> could be overloaded and implemented better if desired). > > Especially > > >>>>>>> since we > > >>>>>>>>> are reporting only probed values: if probed values are almost > > >> always > > >>>>>>> “1.0”, > > >>>>>>>>> it would mean that we have a back pressure. If they are almost > > >> always > > >>>>>>>>> “0.0”, there is probably no back pressure at the sources. > > >>>>>>>>> > > >>>>>>>>> What do you think about this? > > >>>>>>>>> > > >>>>>>>>> Piotrek > > >>>>>>>>> > > >>>>>>>>>> On 30 May 2019, at 11:41, Becket Qin <becket....@gmail.com> > > >> wrote: > > >>>>>>>>>> > > >>>>>>>>>> Hi all, > > >>>>>>>>>> > > >>>>>>>>>> Thanks a lot for all the feedback and comments. I'd like to > > >> continue > > >>>>>>> the > > >>>>>>>>>> discussion on this FLIP. > > >>>>>>>>>> > > >>>>>>>>>> I updated the FLIP-33 wiki to remove all the histogram metrics > > >> from > > >>>>> the > > >>>>>>>>>> first version of metric convention due to the performance > > concern. > > >>>>> The > > >>>>>>>>> plan > > >>>>>>>>>> is to introduce them later when we have a mechanism to opt > > in/out > > >>>>>>>>> metrics. > > >>>>>>>>>> At that point, users can decide whether they want to pay the > > cost > > >> to > > >>>>>>> get > > >>>>>>>>>> the metric or not. > > >>>>>>>>>> > > >>>>>>>>>> As Stephan suggested, for this FLIP, let's first try to agree > on > > >> the > > >>>>>>>>> small > > >>>>>>>>>> list of conventional metrics that connectors should follow. > > >>>>>>>>>> Just to be clear, the purpose of the convention is not to > > enforce > > >>>>> every > > >>>>>>>>>> connector to report all these metrics, but to provide a > guidance > > >> in > > >>>>>>> case > > >>>>>>>>>> these metrics are reported by some connectors. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> @ Stephan & Chesnay, > > >>>>>>>>>> > > >>>>>>>>>> Regarding the duplication of `RecordsIn` metric in operator / > > task > > >>>>>>>>>> IOMetricGroups, from what I understand, for source operator, > it > > is > > >>>>>>>>> actually > > >>>>>>>>>> the SourceFunction that reports the operator level > > >>>>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are essentially > the > > >>>>> same > > >>>>>>>>>> metric in the operator level IOMetricGroup. Similarly for the > > Sink > > >>>>>>>>>> operator, the operator level RecordsOut/RecordsOutPerSecond > > >> metrics > > >>>>> are > > >>>>>>>>>> also reported by the Sink function. I marked them as existing > in > > >> the > > >>>>>>>>>> FLIP-33 wiki page. Please let me know if I misunderstood. > > >>>>>>>>>> > > >>>>>>>>>> Thanks, > > >>>>>>>>>> > > >>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin < > > becket....@gmail.com> > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Piotr, > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks a lot for the feedback. > > >>>>>>>>>>> > > >>>>>>>>>>> 1a) I guess you are referring to the part that "original > system > > >>>>>>> specific > > >>>>>>>>>>> metrics should also be reported". The performance impact > > depends > > >> on > > >>>>>>> the > > >>>>>>>>>>> implementation. An efficient implementation would only record > > the > > >>>>>>> metric > > >>>>>>>>>>> once, but report them with two different metric names. This > is > > >>>>>>> unlikely > > >>>>>>>>> to > > >>>>>>>>>>> hurt performance. > > >>>>>>>>>>> > > >>>>>>>>>>> 1b) Yes, I agree that we should avoid adding overhead to the > > >>>>> critical > > >>>>>>>>> path > > >>>>>>>>>>> by all means. This is sometimes a tradeoff, running blindly > > >> without > > >>>>>>> any > > >>>>>>>>>>> metric gives best performance, but sometimes might be > > frustrating > > >>>>> when > > >>>>>>>>> we > > >>>>>>>>>>> debug some issues. > > >>>>>>>>>>> > > >>>>>>>>>>> 2. The metrics are indeed very useful. Are they supposed to > be > > >>>>>>> reported > > >>>>>>>>> by > > >>>>>>>>>>> the connectors or Flink itself? At this point FLIP-33 is more > > >>>>> focused > > >>>>>>> on > > >>>>>>>>>>> provide a guidance to the connector authors on the metrics > > >>>>> reporting. > > >>>>>>>>> That > > >>>>>>>>>>> said, after FLIP-27, I think we should absolutely report > these > > >>>>> metrics > > >>>>>>>>> in > > >>>>>>>>>>> the abstract implementation. In any case, the metric > convention > > >> in > > >>>>>>> this > > >>>>>>>>>>> list are expected to evolve over time. > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>> > > >>>>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski < > > >>>>> pi...@ververica.com> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for the proposal and driving the effort here Becket > :) > > >> I’ve > > >>>>>>> read > > >>>>>>>>>>>> through the FLIP-33 [1], and here are couple of my thoughts. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Big +1 for standardising the metric names between > connectors, > > it > > >>>>> will > > >>>>>>>>>>>> definitely help us and users a lot. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Issues/questions/things to discuss that I’ve thought of: > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1a. If we are about to duplicate some metrics, can this > > become a > > >>>>>>>>>>>> performance issue? > > >>>>>>>>>>>> 1b. Generally speaking, we should make sure that collecting > > >> those > > >>>>>>>>> metrics > > >>>>>>>>>>>> is as non intrusive as possible, especially that they will > > need > > >>>>> to be > > >>>>>>>>>>>> updated once per record. (They might be collected more > rarely > > >> with > > >>>>>>> some > > >>>>>>>>>>>> overhead, but the hot path of updating it per record will > need > > >> to > > >>>>> be > > >>>>>>> as > > >>>>>>>>>>>> quick as possible). That includes both avoiding heavy > > >> computation > > >>>>> on > > >>>>>>>>> per > > >>>>>>>>>>>> record path: histograms?, measuring time for time based > > metrics > > >>>>> (per > > >>>>>>>>>>>> second) (System.currentTimeMillis() depending on the > > >>>>> implementation > > >>>>>>> can > > >>>>>>>>>>>> invoke a system call) > > >>>>>>>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us to check > > the > > >>>>> cause > > >>>>>>>>> of > > >>>>>>>>>>>> back pressure: > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or boolean > > >>>>>>>>>>>> hasSomethingl/isEmpty) > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean > > >>>>>>>>>>>> hasSomething/isEmpty) > > >>>>>>>>>>>> > > >>>>>>>>>>>> a) is useful in a scenario when we are processing backlog of > > >>>>> records, > > >>>>>>>>> all > > >>>>>>>>>>>> of the internal Flink’s input/output network buffers are > > empty, > > >>>>> and > > >>>>>>> we > > >>>>>>>>> want > > >>>>>>>>>>>> to check whether the external source system is the > bottleneck > > >>>>>>> (source’s > > >>>>>>>>>>>> input queue will be empty), or if the Flink’s connector is > the > > >>>>>>>>> bottleneck > > >>>>>>>>>>>> (source’s input queues will be full). > > >>>>>>>>>>>> b) similar story. Backlog of records, but this time all of > the > > >>>>>>> internal > > >>>>>>>>>>>> Flink’s input/ouput network buffers are full, and we want o > > >> check > > >>>>>>>>> whether > > >>>>>>>>>>>> the external sink system is the bottleneck (sink output > queues > > >> are > > >>>>>>>>> full), > > >>>>>>>>>>>> or if the Flink’s connector is the bottleneck (sink’s output > > >>>>> queues > > >>>>>>>>> will be > > >>>>>>>>>>>> empty) > > >>>>>>>>>>>> > > >>>>>>>>>>>> It might be sometimes difficult to provide those metrics, so > > >> they > > >>>>>>> could > > >>>>>>>>>>>> be optional, but if we could provide them, it would be > really > > >>>>>>> helpful. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Piotrek > > >>>>>>>>>>>> > > >>>>>>>>>>>> [1] > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics > > >>>>>>>>>>>> < > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen <se...@apache.org> > > >> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I think this sounds reasonable. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Let's keep the "reconfiguration without stopping the job" > out > > >> of > > >>>>>>> this, > > >>>>>>>>>>>>> because that would be a super big effort and if we approach > > >> that, > > >>>>>>> then > > >>>>>>>>>>>> in > > >>>>>>>>>>>>> more generic way rather than specific to connector metrics. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I would suggest to look at the following things before > > starting > > >>>>> with > > >>>>>>>>> any > > >>>>>>>>>>>>> implementation work: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> - Try and find a committer to support this, otherwise it > will > > >> be > > >>>>>>> hard > > >>>>>>>>>>>> to > > >>>>>>>>>>>>> make progress > > >>>>>>>>>>>>> - Start with defining a smaller set of "core metrics" and > > >> extend > > >>>>> the > > >>>>>>>>>>>> set > > >>>>>>>>>>>>> later. I think that is easier than now blocking on reaching > > >>>>>>> consensus > > >>>>>>>>>>>> on a > > >>>>>>>>>>>>> large group of metrics. > > >>>>>>>>>>>>> - Find a solution to the problem Chesnay mentioned, that > the > > >>>>>>> "records > > >>>>>>>>>>>> in" > > >>>>>>>>>>>>> metric is somehow overloaded and exists already in the IO > > >> Metric > > >>>>>>>>> group. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin < > > >> becket....@gmail.com > > >>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Stephan, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks a lot for the feedback. All makes sense. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> It is a good suggestion to simply have an > onRecord(numBytes, > > >>>>>>>>> eventTime) > > >>>>>>>>>>>>>> method for connector writers. It should meet most of the > > >>>>>>>>> requirements, > > >>>>>>>>>>>>>> individual > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> The configurable metrics feature is something really > useful, > > >>>>>>>>>>>> especially if > > >>>>>>>>>>>>>> we can somehow make it dynamically configurable without > > >> stopping > > >>>>>>> the > > >>>>>>>>>>>> jobs. > > >>>>>>>>>>>>>> It might be better to make it a separate discussion > because > > it > > >>>>> is a > > >>>>>>>>>>>> more > > >>>>>>>>>>>>>> generic feature instead of only for connectors. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> So in order to make some progress, in this FLIP we can > limit > > >> the > > >>>>>>>>>>>> discussion > > >>>>>>>>>>>>>> scope to the connector related items: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - the standard connector metric names and types. > > >>>>>>>>>>>>>> - the abstract ConnectorMetricHandler interface > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I'll start a separate thread to discuss other general > metric > > >>>>>>> related > > >>>>>>>>>>>>>> enhancement items including: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - optional metrics > > >>>>>>>>>>>>>> - dynamic metric configuration > > >>>>>>>>>>>>>> - potential combination with rate limiter > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Does this plan sound reasonable? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen < > > >> se...@apache.org> > > >>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Ignoring for a moment implementation details, this > > connector > > >>>>>>> metrics > > >>>>>>>>>>>> work > > >>>>>>>>>>>>>>> is a really good thing to do, in my opinion > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The questions "oh, my job seems to be doing nothing, I am > > >>>>> looking > > >>>>>>> at > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>> UI > > >>>>>>>>>>>>>>> and the 'records in' value is still zero" is in the top > > three > > >>>>>>>>> support > > >>>>>>>>>>>>>>> questions I have been asked personally. > > >>>>>>>>>>>>>>> Introspection into "how far is the consumer lagging > behind" > > >>>>> (event > > >>>>>>>>>>>> time > > >>>>>>>>>>>>>>> fetch latency) came up many times as well. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> So big +1 to solving this problem. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> About the exact design - I would try to go for the > > following > > >>>>>>>>>>>> properties: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - keep complexity of of connectors. Ideally the metrics > > >> handler > > >>>>>>> has > > >>>>>>>>> a > > >>>>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or so, and > > >>>>> everything > > >>>>>>>>>>>> else is > > >>>>>>>>>>>>>>> internal to the handler. That makes it dead simple for > the > > >>>>>>>>> connector. > > >>>>>>>>>>>> We > > >>>>>>>>>>>>>>> can also think of an extensive scheme for connector > > specific > > >>>>>>>>> metrics. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - make it configurable on the job it cluster level which > > >>>>> metrics > > >>>>>>> the > > >>>>>>>>>>>>>>> handler internally creates when that method is invoked. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>> Stephan > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler < > > >>>>>>>>> ches...@apache.org > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> As I said before, I believe this to be over-engineered > and > > >>>>> have > > >>>>>>> no > > >>>>>>>>>>>>>>>> interest in this implementation. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> There are conceptual issues like defining a duplicate > > >>>>>>>>>>>>>> numBytesIn(PerSec) > > >>>>>>>>>>>>>>>> metric that already exists for each operator. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote: > > >>>>>>>>>>>>>>>>> A few updates to the thread. I uploaded a patch[1] as a > > >>>>> complete > > >>>>>>>>>>>>>>>>> example of how users can use the metrics in the future. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Some thoughts below after taking a look at the > > >>>>>>> AbstractMetricGroup > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> its subclasses. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> This patch intends to provide convenience for Flink > > >> connector > > >>>>>>>>>>>>>>>>> implementations to follow metrics standards proposed in > > >>>>> FLIP-33. > > >>>>>>>>> It > > >>>>>>>>>>>>>>>>> also try to enhance the metric management in general > way > > to > > >>>>> help > > >>>>>>>>>>>>>> users > > >>>>>>>>>>>>>>>>> with: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1. metric definition > > >>>>>>>>>>>>>>>>> 2. metric dependencies check > > >>>>>>>>>>>>>>>>> 3. metric validation > > >>>>>>>>>>>>>>>>> 4. metric control (turn on / off particular metrics) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the > > functionality > > >> of > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the metric group > > >>>>>>> hierarchy, > > >>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>> does not really manage the metrics other than keeping > > them > > >>>>> in a > > >>>>>>>>> Map. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Ideally we should only have one entry point for the > > >> metrics. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Right now the entry point is |AbstractMetricGroup|. > > >> However, > > >>>>>>>>> besides > > >>>>>>>>>>>>>>>>> the missing functionality mentioned above, > > >>>>> |AbstractMetricGroup| > > >>>>>>>>>>>>>> seems > > >>>>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could extract it out > > to > > >>>>>>>>>>>>>>>>> flink-metrics in order to use it for generic purpose. > > There > > >>>>> will > > >>>>>>>>> be > > >>>>>>>>>>>>>>>>> some work, though. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics| in this > > patch > > >>>>> as > > >>>>>>> the > > >>>>>>>>>>>>>>>>> metric entry point. It wraps metric group and provides > > the > > >>>>>>> missing > > >>>>>>>>>>>>>>>>> functionalities. Then we can roll out this pattern to > > >> runtime > > >>>>>>>>>>>>>>>>> components gradually as well. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> My first thought is that the latter approach gives a > more > > >>>>> smooth > > >>>>>>>>>>>>>>>>> migration. But I am also OK with doing a refactoring on > > the > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| family. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1 > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin < > > >>>>>>> becket....@gmail.com > > >>>>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Chesnay, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> It might be easier to discuss some implementation > details > > >> in > > >>>>>>> the > > >>>>>>>>>>>>>>>>> PR review instead of in the FLIP discussion thread. I > > have > > >> a > > >>>>>>>>>>>>>> patch > > >>>>>>>>>>>>>>>>> for Kafka connectors ready but haven't submitted the PR > > >> yet. > > >>>>>>>>>>>>>>>>> Hopefully that will help explain a bit more. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ** Re: metric type binding > > >>>>>>>>>>>>>>>>> This is a valid point that worths discussing. If I > > >> understand > > >>>>>>>>>>>>>>>>> correctly, there are two points: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1. Metric type / interface does not matter as long as > the > > >>>>>>> metric > > >>>>>>>>>>>>>>>>> semantic is clearly defined. > > >>>>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as the > metric > > >>>>>>>>>>>>>> semantic > > >>>>>>>>>>>>>>>>> is defined, metric type does not matter. To some > extent, > > >>>>> Gauge > > >>>>>>> / > > >>>>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be think of > as > > >>>>> some > > >>>>>>>>>>>>>>>>> well-recognized semantics, if you wish. In Flink, these > > >>>>> metric > > >>>>>>>>>>>>>>>>> semantics have their associated interface classes. In > > >>>>> practice, > > >>>>>>>>>>>>>>>>> such semantic to interface binding seems necessary for > > >>>>>>> different > > >>>>>>>>>>>>>>>>> components to communicate. Simply standardize the > > semantic > > >>>>> of > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> connector metrics seems not sufficient for people to > > build > > >>>>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we still > need > > >> to > > >>>>>>>>> have > > >>>>>>>>>>>>>>>>> some embodiment of the metric semantics that people can > > >>>>> program > > >>>>>>>>>>>>>>>>> against. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be exposed > > using > > >>>>>>>>>>>>>>>>> different metric types / interfaces. > > >>>>>>>>>>>>>>>>> This is a good point. Counter and Gauge-as-a-Counter > are > > >>>>> pretty > > >>>>>>>>>>>>>>>>> much interchangeable. This is more of a trade-off > between > > >> the > > >>>>>>>>>>>>>> user > > >>>>>>>>>>>>>>>>> experience of metric producers and consumers. The > metric > > >>>>>>>>>>>>>> producers > > >>>>>>>>>>>>>>>>> want to use Counter or Gauge depending on whether the > > >> counter > > >>>>>>> is > > >>>>>>>>>>>>>>>>> already tracked in code, while ideally the metric > > consumers > > >>>>>>> only > > >>>>>>>>>>>>>>>>> want to see a single metric type for each metric. I am > > >>>>> leaning > > >>>>>>>>>>>>>>>>> towards to make the metric producers happy, i.e. allow > > >> Gauge > > >>>>> / > > >>>>>>>>>>>>>>>>> Counter metric type, and the the metric consumers > handle > > >> the > > >>>>>>>>> type > > >>>>>>>>>>>>>>>>> variation. The reason is that in practice, there might > be > > >>>>> more > > >>>>>>>>>>>>>>>>> connector implementations than metric reporter > > >>>>> implementations. > > >>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>> could also provide some helper method to facilitate > > reading > > >>>>>>> from > > >>>>>>>>>>>>>>>>> such variable metric type. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Just some quick replies to the comments around > > >> implementation > > >>>>>>>>>>>>>>>> details. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered except > > >>>>>>>>>>>>>>>>> connector-specific > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Register connector specific ones in a single place is > > >>>>> actually > > >>>>>>>>>>>>>>>>> something that I want to achieve. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases like > > >>>>> Prometheus. > > >>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last > > >>>>>>>>> fetchTime/emitTime > > >>>>>>>>>>>>>>>>> that is > > >>>>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), > where a > > >>>>>>> user > > >>>>>>>>>>>>>>>>> could build > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Not sure if such downsampling works. As an example, if > a > > >> user > > >>>>>>>>>>>>>>>>> complains that there are some intermittent latency > spikes > > >>>>>>> (maybe > > >>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>> few records in 10 seconds) in their processing system. > > >>>>> Having a > > >>>>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems unlikely > > useful. > > >>>>>>>>>>>>>>>>> However by looking at actual 99.9 percentile latency > > might > > >>>>>>> help. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> > wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Re: over complication of implementation. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I think I get understand better know what you're > > >> shooting > > >>>>>>>>>>>>>> for, > > >>>>>>>>>>>>>>>>> effectively something like the OperatorIOMetricGroup. > > >>>>>>>>>>>>>>>>> But still, re-define setupConnectorMetrics() to > accept > > a > > >>>>>>> set > > >>>>>>>>>>>>>>>>> of flags > > >>>>>>>>>>>>>>>>> for counters/meters(ans _possibly_ histograms) along > > >>>>> with a > > >>>>>>>>>>>>>>>>> set of > > >>>>>>>>>>>>>>>>> well-defined Optional<Gauge<?>>, and return the > group. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Solves all issues as far as i can tell: > > >>>>>>>>>>>>>>>>> 1) no metrics must be created manually (except > Gauges, > > >>>>>>> which > > >>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>> effectively just Suppliers and you can't get around > > >>>>> this), > > >>>>>>>>>>>>>>>>> 2) additional metrics can be registered on the > returned > > >>>>>>>>>>>>>> group, > > >>>>>>>>>>>>>>>>> 3) see 1), > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered except > > >>>>>>>>>>>>>>>>> connector-specific > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Re: Histogram > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1) As an example, whether "numRecordsIn" is exposed > as > > a > > >>>>>>>>>>>>>>>>> Counter or a > > >>>>>>>>>>>>>>>>> Gauge should be irrelevant. So far we're using the > > >> metric > > >>>>>>>>>>>>>> type > > >>>>>>>>>>>>>>>>> that is > > >>>>>>>>>>>>>>>>> the most convenient at exposing a given value. If > there > > >>>>> is > > >>>>>>>>>>>>>>>>> some backing > > >>>>>>>>>>>>>>>>> data-structure that we want to expose some data from > we > > >>>>>>>>>>>>>>>>> typically opt > > >>>>>>>>>>>>>>>>> for a Gauge, as otherwise we're just mucking around > > with > > >>>>>>> the > > >>>>>>>>>>>>>>>>> Meter/Counter API to get it to match. Similarly, if > we > > >>>>> want > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> count > > >>>>>>>>>>>>>>>>> something but no current count exists we typically > > added > > >>>>> a > > >>>>>>>>>>>>>>>>> Counter. > > >>>>>>>>>>>>>>>>> That's why attaching semantics to metric types makes > > >>>>> little > > >>>>>>>>>>>>>>>>> sense (but > > >>>>>>>>>>>>>>>>> unfortunately several reporters already do it); for > > >>>>>>>>>>>>>>>>> counters/meters > > >>>>>>>>>>>>>>>>> certainly, but the majority of metrics are gauges. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases like > > >>>>> Prometheus. > > >>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last > > >>>>>>>>> fetchTime/emitTime > > >>>>>>>>>>>>>>>>> that is > > >>>>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), > where a > > >>>>>>> user > > >>>>>>>>>>>>>>>>> could build > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On 22.02.2019 13:57, Becket Qin wrote: > > >>>>>>>>>>>>>>>>>> Hi Chesnay, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the explanation. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: FLIP > > >>>>>>>>>>>>>>>>>> I might have misunderstood this, but it seems that > > "major > > >>>>>>>>>>>>>>>>> changes" are well > > >>>>>>>>>>>>>>>>>> defined in FLIP. The full contents is following: > > >>>>>>>>>>>>>>>>>> What is considered a "major change" that needs a FLIP? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Any of the following should be considered a major > > change: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of > > >>>>>>>>>>>>>>>>> functionality > > >>>>>>>>>>>>>>>>>> - *Any change that impacts the public interfaces of > the > > >>>>>>>>>>>>>>>>> project* > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> What are the "public interfaces" of the project? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> *All of the following are public interfaces *that > people > > >>>>>>>>>>>>>>>>> build around: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - DataStream and DataSet API, including classes > related > > >>>>>>>>>>>>>>>>> to that, such as > > >>>>>>>>>>>>>>>>>> StreamExecutionEnvironment > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Classes marked with the @Public annotation > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - On-disk binary formats, such as > > >>>>>>>>>>>>>> checkpoints/savepoints > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e. > > >>>>>>>>>>>>>>>>> bin/flink, Yarn scripts, > > >>>>>>>>>>>>>>>>>> Mesos scripts > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - Configuration settings > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> - *Exposed monitoring information* > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> So any monitoring information change is considered as > > >>>>>>>>>>>>>> public > > >>>>>>>>>>>>>>>>> interface, and > > >>>>>>>>>>>>>>>>>> any public interface change is considered as a "major > > >>>>>>>>>>>>>>> change". > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: over complication of implementation. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Although this is more of implementation details that > is > > >> not > > >>>>>>>>>>>>>>>>> covered by the > > >>>>>>>>>>>>>>>>>> FLIP. But it may be worth discussing. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> First of all, I completely agree that we should use > the > > >>>>>>>>>>>>>>>>> simplest way to > > >>>>>>>>>>>>>>>>>> achieve our goal. To me the goal is the following: > > >>>>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces. > > >>>>>>>>>>>>>>>>>> 2. The easiness of creating a connector. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Both of them are important to the prosperity of the > > >>>>>>>>>>>>>>>>> connector ecosystem. So > > >>>>>>>>>>>>>>>>>> I'd rather abstract as much as possible on our side to > > >> make > > >>>>>>>>>>>>>>>>> the connector > > >>>>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a static > util > > >>>>>>>>>>>>>>>>> method approach > > >>>>>>>>>>>>>>>>>> might have a few drawbacks: > > >>>>>>>>>>>>>>>>>> 1. Users still have to construct the metrics by > > >> themselves. > > >>>>>>>>>>>>>>>>> (And note that > > >>>>>>>>>>>>>>>>>> this might be erroneous by itself. For example, a > > customer > > >>>>>>>>>>>>>>>>> wrapper around > > >>>>>>>>>>>>>>>>>> dropwizard meter maybe used instead of MeterView). > > >>>>>>>>>>>>>>>>>> 2. When connector specific metrics are added, it is > > >>>>>>>>>>>>>>>>> difficult to enforce > > >>>>>>>>>>>>>>>>>> the scope to be the same as standard metrics. > > >>>>>>>>>>>>>>>>>> 3. It seems that a method proliferation is inevitable > if > > >> we > > >>>>>>>>>>>>>>>>> want to apply > > >>>>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn was not > > >>>>>>>>>>>>>>>>> registered for a meter. > > >>>>>>>>>>>>>>>>>> 4. Metrics are still defined in random places and hard > > to > > >>>>>>>>>>>>>>>> track. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The current PR I had was inspired by the Config system > > in > > >>>>>>>>>>>>>>>>> Kafka, which I > > >>>>>>>>>>>>>>>>>> found pretty handy. In fact it is not only used by > Kafka > > >>>>>>>>>>>>>>>>> itself but even > > >>>>>>>>>>>>>>>>>> some other projects that depend on Kafka. I am not > > saying > > >>>>>>>>>>>>>>>>> this approach is > > >>>>>>>>>>>>>>>>>> perfect. But I think it worths to save the work for > > >>>>>>>>>>>>>>>>> connector writers and > > >>>>>>>>>>>>>>>>>> encourage more systematic implementation. That being > > said, > > >>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>> am fully open > > >>>>>>>>>>>>>>>>>> to suggestions. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Re: Histogram > > >>>>>>>>>>>>>>>>>> I think there are two orthogonal questions around > those > > >>>>>>>>>>>>>>>> metrics: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just looking at > the > > >>>>>>>>>>>>>>>>> meaning of a > > >>>>>>>>>>>>>>>>>> metric, is generic to all connectors? If the answer is > > >> yes, > > >>>>>>>>>>>>>>>>> we should > > >>>>>>>>>>>>>>>>>> include the metric into the convention. No matter > > whether > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> include it > > >>>>>>>>>>>>>>>>>> into the convention or not, some connector > > implementations > > >>>>>>>>>>>>>>>>> will emit such > > >>>>>>>>>>>>>>>>>> metric. It is better to have a convention than letting > > >> each > > >>>>>>>>>>>>>>>>> connector do > > >>>>>>>>>>>>>>>>>> random things. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what should we > > do? > > >>>>>>>>>>>>>>>>>> I agree that we should make it clear that using > > histograms > > >>>>>>>>>>>>>>>>> will have > > >>>>>>>>>>>>>>>>>> performance risk. But I do see histogram is useful in > > some > > >>>>>>>>>>>>>>>>> fine-granularity > > >>>>>>>>>>>>>>>>>> debugging where one do not have the luxury to stop the > > >>>>>>>>>>>>>>>>> system and inject > > >>>>>>>>>>>>>>>>>> more inspection code. So the workaround I am thinking > is > > >> to > > >>>>>>>>>>>>>>>>> provide some > > >>>>>>>>>>>>>>>>>> implementation suggestions. Assume later on we have a > > >>>>>>>>>>>>>>>>> mechanism of > > >>>>>>>>>>>>>>>>>> selective metrics. In the abstract metrics class we > can > > >>>>>>>>>>>>>>>>> disable those > > >>>>>>>>>>>>>>>>>> metrics by default individual connector writers does > not > > >>>>>>>>>>>>>>>>> have to do > > >>>>>>>>>>>>>>>>>> anything (this is another advantage of having an > > >>>>>>>>>>>>>>>>> AbstractMetrics instead of > > >>>>>>>>>>>>>>>>>> static util methods.) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I am not sure I fully understand the histogram in the > > >>>>>>>>>>>>>>>>> backend approach. Can > > >>>>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting the raw > > data, > > >>>>>>>>>>>>>>>>> e.g. fetchTime > > >>>>>>>>>>>>>>>>>> and emitTime with each record and let the histogram > > >>>>>>>>>>>>>>>>> computation happen in > > >>>>>>>>>>>>>>>>>> the background? Or let the processing thread putting > the > > >>>>>>>>>>>>>>>>> values into a > > >>>>>>>>>>>>>>>>>> queue and have a separate thread polling from the > queue > > >> and > > >>>>>>>>>>>>>>>>> add them into > > >>>>>>>>>>>>>>>>>> the histogram? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay Schepler > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> > > wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Flip > > >>>>>>>>>>>>>>>>>>> The very first line under both the main header and > > >> Purpose > > >>>>>>>>>>>>>>>>> section > > >>>>>>>>>>>>>>>>>>> describe Flips as "major changes", which this isn't. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: complication > > >>>>>>>>>>>>>>>>>>> I'm not arguing against standardization, but again an > > >>>>>>>>>>>>>>>>> over-complicated > > >>>>>>>>>>>>>>>>>>> implementation when a static utility method would be > > >>>>>>>>>>>>>>>>> sufficient. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> public static void setupConnectorMetrics( > > >>>>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup, > > >>>>>>>>>>>>>>>>>>> String connectorName, > > >>>>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn, > > >>>>>>>>>>>>>>>>>>> ...) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> This gives you all you need: > > >>>>>>>>>>>>>>>>>>> * a well-defined set of metrics for a connector to > > opt-in > > >>>>>>>>>>>>>>>>>>> * standardized naming schemes for scope and > individual > > >>>>>>>>>>>>>>> metrics > > >>>>>>>>>>>>>>>>>>> * standardize metric types (although personally I'm > not > > >>>>>>>>>>>>>>>>> interested in that > > >>>>>>>>>>>>>>>>>>> since metric types should be considered syntactic > > sugar) > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Configurable Histogram > > >>>>>>>>>>>>>>>>>>> If anything they _must_ be turned off by default, but > > the > > >>>>>>>>>>>>>>>>> metric system is > > >>>>>>>>>>>>>>>>>>> already exposing so many options that I'm not too > keen > > on > > >>>>>>>>>>>>>>>>> adding even more. > > >>>>>>>>>>>>>>>>>>> You have also only addressed my first argument > against > > >>>>>>>>>>>>>>>>> histograms > > >>>>>>>>>>>>>>>>>>> (performance), the second one still stands (calculate > > >>>>>>>>>>>>>>>>> histogram in metric > > >>>>>>>>>>>>>>>>>>> backends instead). > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote: > > >>>>>>>>>>>>>>>>>>>> Hi Chesnay, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is worthy of a > > >> FLIP > > >>>>>>>>>>>>>>>>> because it is > > >>>>>>>>>>>>>>>>>>>> public API. According to the FLIP description a FlIP > > is > > >>>>>>>>>>>>>>>>> required in case > > >>>>>>>>>>>>>>>>>>> of: > > >>>>>>>>>>>>>>>>>>>> - Any change that impacts the public interfaces of > > >>>>>>>>>>>>>>>>> the project > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> and the following entry is found in the definition > of > > >>>>>>>>>>>>>>>>> "public interface". > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Exposed monitoring information > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Metrics are critical to any production system. So a > > >> clear > > >>>>>>>>>>>>>>>>> metric > > >>>>>>>>>>>>>>>>>>> definition > > >>>>>>>>>>>>>>>>>>>> is important for any serious users. For an > > organization > > >>>>>>>>>>>>>>>>> with large Flink > > >>>>>>>>>>>>>>>>>>>> installation, change in metrics means great amount > of > > >>>>>>>>>>>>>>>>> work. So such > > >>>>>>>>>>>>>>>>>>> changes > > >>>>>>>>>>>>>>>>>>>> do need to be fully discussed and documented. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: Histogram. > > >>>>>>>>>>>>>>>>>>>> We can discuss whether there is a better way to > expose > > >>>>>>>>>>>>>>>>> metrics that are > > >>>>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark on > various > > >>>>>>>>>>>>>>>>> histogram > > >>>>>>>>>>>>>>>>>>>> implementations also indicates that they are > > >>>>>>>>>>>>>> significantly > > >>>>>>>>>>>>>>>>> slower than > > >>>>>>>>>>>>>>>>>>>> other metric types. But I don't think that means > never > > >>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>> histogram, but > > >>>>>>>>>>>>>>>>>>>> means use it with caution. For example, we can > suggest > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> implementations > > >>>>>>>>>>>>>>>>>>>> to turn them off by default and only turn it on for > a > > >>>>>>>>>>>>>>>>> small amount of > > >>>>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>> when performing some micro-debugging. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: complication: > > >>>>>>>>>>>>>>>>>>>> Connector conventions are essential for Flink > > ecosystem. > > >>>>>>>>>>>>>>>>> Flink connectors > > >>>>>>>>>>>>>>>>>>>> pool is probably the most important part of Flink, > > just > > >>>>>>>>>>>>>>>>> like any other > > >>>>>>>>>>>>>>>>>>> data > > >>>>>>>>>>>>>>>>>>>> system. Clear conventions of connectors will help > > build > > >>>>>>>>>>>>>>>>> Flink ecosystem > > >>>>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>> a more organic way. > > >>>>>>>>>>>>>>>>>>>> Take the metrics convention as an example, imagine > > >>>>>>>>>>>>>> someone > > >>>>>>>>>>>>>>>>> has developed > > >>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> Flink connector for System foo, and another > developer > > >> may > > >>>>>>>>>>>>>>>>> have developed > > >>>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for Flink which > > >>>>>>>>>>>>>>>>> analyzes the Flink > > >>>>>>>>>>>>>>>>>>> job > > >>>>>>>>>>>>>>>>>>>> performance based on metrics. With a clear metric > > >>>>>>>>>>>>>>>>> convention, those two > > >>>>>>>>>>>>>>>>>>>> projects could be developed independently. Once > users > > >> put > > >>>>>>>>>>>>>>>>> them together, > > >>>>>>>>>>>>>>>>>>>> it would work without additional modifications. This > > >>>>>>>>>>>>>>>>> cannot be easily > > >>>>>>>>>>>>>>>>>>>> achieved by just defining a few constants. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: selective metrics: > > >>>>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate thread. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> @Dawid > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency > > >>>>>>>>>>>>>>>>>>>> The primary purpose of establish such a convention > is > > to > > >>>>>>>>>>>>>>>>> help developers > > >>>>>>>>>>>>>>>>>>>> write connectors in a more compatible way. The > > >> convention > > >>>>>>>>>>>>>>>>> is supposed to > > >>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>> defined more proactively. So when look at the > > >> convention, > > >>>>>>>>>>>>>>>>> it seems more > > >>>>>>>>>>>>>>>>>>>> important to see if the concept is applicable to > > >>>>>>>>>>>>>>>>> connectors in general. > > >>>>>>>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>>>>>> might be true so far only Kafka connector reports > > >>>>>>>>>>>>>> latency. > > >>>>>>>>>>>>>>>>> But there > > >>>>>>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>>>>>> be hundreds of other connector implementations in > the > > >>>>>>>>>>>>>>>>> Flink ecosystem, > > >>>>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of them also > > >> emits > > >>>>>>>>>>>>>>>>> latency. I > > >>>>>>>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>>>>> a lot of other sources actually also has an append > > >>>>>>>>>>>>>>>>> timestamp. e.g. > > >>>>>>>>>>>>>>>>>>> database > > >>>>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't be > > surprised > > >>>>>>>>>>>>>>>>> if some database > > >>>>>>>>>>>>>>>>>>>> connector can also emit latency metrics. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay Schepler > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to investigate > > this > > >>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> part of this > > >>>>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of interest > for > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> entire metric > > >>>>>>>>>>>>>>>>>>>>> system, and should be designed for as such. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the core of > > >>>>>>>>>>>>>>>>> Flink. They are > > >>>>>>>>>>>>>>>>>>>>> significantly more expensive than other metrics, > and > > >>>>>>>>>>>>>>>>> calculating > > >>>>>>>>>>>>>>>>>>>>> histograms in the application is regarded as an > > >>>>>>>>>>>>>>>>> anti-pattern by several > > >>>>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to expose > the > > >> raw > > >>>>>>>>>>>>>>>>> data and > > >>>>>>>>>>>>>>>>>>>>> calculate the histogram in the backend. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Second, this seems overly complicated. Given that > we > > >>>>>>>>>>>>>>>>> already established > > >>>>>>>>>>>>>>>>>>>>> that not all connectors will export all metrics we > > are > > >>>>>>>>>>>>>>>>> effectively > > >>>>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming scheme. > We > > >>>>>>>>>>>>>>>>> don't need anything > > >>>>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a few > > constants > > >>>>>>>>>>>>>>>>> that all > > >>>>>>>>>>>>>>>>>>>>> connectors use. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a FLIP. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote: > > >>>>>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. But > those > > >>>>>>>>>>>>>>>>> particular metrics > > >>>>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would only be > > >>>>>>>>>>>>>>>>> reported if user uses > > >>>>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal timestampAssigner with > > >>>>>>>>>>>>>>>>> StreamCharacteristic > > >>>>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like a very > > >>>>>>>>>>>>>>>>> specific case. I am > > >>>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic metric that > > will > > >>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>> disabled/absent for most of implementations. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, that > > might > > >>>>>>>>>>>>>>>>> make sense to > > >>>>>>>>>>>>>>>>>>>>>> investigate in the future. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Dawid > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote: > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense to me. > > >> There > > >>>>>>>>>>>>>>>>> are two cases > > >>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> addressed. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a guidance. It > is > > >>>>>>>>>>>>>>>>> likely that a > > >>>>>>>>>>>>>>>>>>>>> connector > > >>>>>>>>>>>>>>>>>>>>>>> only supports some but not all of the metrics. In > > >> that > > >>>>>>>>>>>>>>>>> case, each > > >>>>>>>>>>>>>>>>>>>>> connector > > >>>>>>>>>>>>>>>>>>>>>>> implementation should have the freedom to decide > > >> which > > >>>>>>>>>>>>>>>>> metrics are > > >>>>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are supported, the > > >>>>>>>>>>>>>>>>> guidance should be > > >>>>>>>>>>>>>>>>>>>>>>> followed. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable certain > > >> metrics > > >>>>>>>>>>>>>>>>> for some reason > > >>>>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of data). A > > generic > > >>>>>>>>>>>>>>>>> mechanism should > > >>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>>>> provided to allow user choose which metrics are > > >>>>>>>>>>>>>>>>> reported. This > > >>>>>>>>>>>>>>>>>>> mechanism > > >>>>>>>>>>>>>>>>>>>>>>> should also be honored by the connector > > >>>>>>>>>>>>>> implementations. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you? > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid Wysakowicz > < > > >>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto: > > dwysakow...@apache.org > > >>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a unified, > > >>>>>>>>>>>>>>>>> standard set of > > >>>>>>>>>>>>>>>>>>> metrics > > >>>>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight concerns > about > > >>>>>>>>>>>>>>>>> fetchLatency and > > >>>>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based on > > EventTime > > >>>>>>>>>>>>>>>>> which is not a > > >>>>>>>>>>>>>>>>>>>>> purely > > >>>>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on some > > business > > >>>>>>>>>>>>>>>>> logic, might be > > >>>>>>>>>>>>>>>>>>>>> absent > > >>>>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics could > also > > >>>>>>>>>>>>>>>>> behave in a weird > > >>>>>>>>>>>>>>>>>>>>> way in > > >>>>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I am not > sure > > >> if > > >>>>>>>>>>>>>>>>> we should > > >>>>>>>>>>>>>>>>>>> include > > >>>>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could at > least > > >>>>>>>>>>>>>>>>> introduce a feature > > >>>>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Dawid > > >>>>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the proposed > > >>>>>>>>>>>>>>>>> metrics. I'll start a > > >>>>>>>>>>>>>>>>>>>>>>>> voting thread later toady. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket Qin > > >>>>>>>>>>>>>>>>> <becket....@gmail.com <mailto:becket....@gmail.com>> > < > > >>>>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com > >> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>> Hi folks, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP discussion thread > > >>>>>>>>>>>>>> about > > >>>>>>>>>>>>>>>>> standardize > > >>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>> connector metrics. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a convention > of > > >>>>>>>>>>>>>>>>> Flink connector > > >>>>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the monitoring > and > > >>>>>>>>>>>>>>>>> alerting on Flink > > >>>>>>>>>>>>>>>>>>>>> jobs. > > >>>>>>>>>>>>>>>>>>>>>>>> The FLIP link is following: > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >> > > >> > > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk