Thanks for driving this. I’ve just noticed one small thing. With new SourceReader interface Flink will be able to provide `idleTime` metric automatically.
Piotrek > On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com> wrote: > > Thanks all for the feedback and discussion. > > Since there wasn't any concern raised, I've started the voting thread for > this FLIP, but please feel free to continue the discussion here if you > think something still needs to be addressed. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Jun 10, 2019 at 9:10 AM Becket Qin <becket....@gmail.com> wrote: > >> Hi Piotr, >> >> Thanks for the comments. Yes, you are right. Users will have to look at >> other metrics to decide whether the pipeline is healthy or not in the first >> place before they can use the time-based metric to fix the bottleneck. >> >> I agree that once we have FLIP-27 ready, some of the metrics can just be >> reported by the abstract implementation. >> >> I've updated FLIP-33 wiki page to add the pendingBytes and pendingRecords >> metric. Please let me know if you have any concern over the updated metric >> convention proposal. >> >> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen >> <step...@ververica.com> will you also have time to take a look at the >> proposed metric convention? If there is no further concern I'll start a >> voting thread for this FLIP in two days. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski <pi...@ververica.com> wrote: >> >>> Hi Becket, >>> >>> Thanks for the answer :) >>> >>>> By time-based metric, I meant the portion of time spent on producing the >>>> record to downstream. For example, a source connector can report that >>> it's >>>> spending 80% of time to emit record to downstream processing pipeline. >>> In >>>> another case, a sink connector may report that its spending 30% of time >>>> producing the records to the external system. >>>> >>>> This is in some sense equivalent to the buffer usage metric: >>> >>>> - 80% of time spent on emitting records to downstream ---> downstream >>>> node is bottleneck ---> output buffer is probably full. >>>> - 30% of time spent on emitting records to downstream ---> downstream >>>> node is not bottleneck ---> output buffer is probably not full. >>> >>> If by “time spent on emitting records to downstream” you understand >>> “waiting on back pressure”, then I see your point. And I agree that some >>> kind of ratio/time based metric gives you more information. However under >>> “time spent on emitting records to downstream” might be hidden the >>> following (extreme) situation: >>> >>> 1. Job is barely able to handle influx of records, there is 99% >>> CPU/resource usage in the cluster, but nobody is >>> bottlenecked/backpressured, all output buffers are empty, everybody is >>> waiting in 1% of it’s time for more records to process. >>> 2. 80% time can still be spent on "down stream operators”, because they >>> are the CPU intensive operations, but this doesn’t mean that increasing the >>> parallelism down the stream will help with anything there. To the contrary, >>> increasing parallelism of the source operator might help to increase >>> resource utilisation up to 100%. >>> >>> However, this “time based/ratio” approach can be extended to in/output >>> buffer usage. Besides collecting an information that input/output buffer is >>> full/empty, we can probe profile how often are buffer empty/full. If output >>> buffer is full 1% of times, there is almost no back pressure. If it’s full >>> 80% of times, there is some back pressure, if it’s full 99.9% of times, >>> there is huge back pressure. >>> >>> Now for autoscaling you could compare the input & output buffers fill >>> ratio: >>> >>> 1. Both are high, the source of bottleneck is down the stream >>> 2. Output is low, input is high, this is the bottleneck and the higher >>> the difference, the bigger source of bottleneck is this is operator/task >>> 3. Output is high, input is low - there was some load spike that we are >>> currently finishing to process >>> >>> >>> >>> But long story short, we are probably diverging from the topic of this >>> discussion, and we can discuss this at some later point. >>> >>> For now, for sources: >>> >>> as I wrote before, +1 for: >>> - pending.bytes, Gauge >>> - pending.messages, Gauge >>> >>> When we will be developing/discussing SourceReader from FLIP-27 we might >>> then add: >>> >>> - in-memory.buffer.usage (0 - 100%) >>> >>> Which will be estimated automatically by Flink while user will be able to >>> override/provide better estimation. >>> >>> Piotrek >>> >>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> wrote: >>>> >>>> Hi Piotr, >>>> >>>> Thanks for the explanation. Please see some clarifications below. >>>> >>>> By time-based metric, I meant the portion of time spent on producing the >>>> record to downstream. For example, a source connector can report that >>> it's >>>> spending 80% of time to emit record to downstream processing pipeline. >>> In >>>> another case, a sink connector may report that its spending 30% of time >>>> producing the records to the external system. >>>> >>>> This is in some sense equivalent to the buffer usage metric: >>>> - 80% of time spent on emitting records to downstream ---> downstream >>>> node is bottleneck ---> output buffer is probably full. >>>> - 30% of time spent on emitting records to downstream ---> downstream >>>> node is not bottleneck ---> output buffer is probably not full. >>>> >>>> However, the time-based metric has a few advantages that the buffer >>> usage >>>> metric may not have. >>>> >>>> 1. Buffer usage metric may not be applicable to all the connector >>>> implementations, while reporting time-based metric are always doable. >>>> Some source connectors may not have any input buffer, or they may use >>> some >>>> third party library that does not expose the input buffer at all. >>>> Similarly, for sink connectors, the implementation may not have any >>> output >>>> buffer, or the third party library does not expose such buffer. >>>> >>>> 2. Although both type of metrics can detect bottleneck, time-based >>> metrics >>>> can be used to generate a more informed action to remove the bottleneck. >>>> For example, when the downstream is bottleneck, the output buffer usage >>>> metric is likely to be 100%, and the input buffer usage might be 0%. >>> That >>>> means we don't know what is the suitable parallelism to lift the >>>> bottleneck. The time-based metric, on the other hand, would give useful >>>> information, e.g. if 80% of time was spent on emitting records, we can >>>> roughly increase the downstream node parallelism by 4 times. >>>> >>>> Admittedly, the time-based metrics are more expensive than buffer >>> usage. So >>>> we may have to do some sampling to reduce the cost. But in general, >>> using >>>> time-based metrics seems worth adding. >>>> >>>> That being said, I don't think buffer usage metric and time-based >>> metrics >>>> are mutually exclusive. We can probably have both. It is just that in >>>> practice, features like auto-scaling might prefer time-based metrics for >>>> the reason stated above. >>>> >>>>> 1. Define the metrics that would allow us to manually detect >>> bottlenecks. >>>> As I wrote, we already have them in most of the places, except of >>>> sources/sinks. >>>>> 2. Use those metrics, to automatically detect bottlenecks. Currently we >>>> are only automatically detecting back pressure and reporting it to the >>> user >>>> in web UI (is it exposed as a metric at all?). Detecting the root cause >>> of >>>> the back pressure (bottleneck) is one step further. >>>>> 3. Use the knowledge about where exactly the bottleneck is located, to >>>> try to do something with it. >>>> >>>> As explained above, I think time-based metric also addresses item 1 and >>>> item 2. >>>> >>>> Any thoughts? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> >>>> >>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>>> >>>>> Hi again :) >>>>> >>>>>> - pending.bytes, Gauge >>>>>> - pending.messages, Gauge >>>>> >>>>> >>>>> +1 >>>>> >>>>> And true, instead of overloading one of the metric it is better when >>> user >>>>> can choose to provide only one of them. >>>>> >>>>> Re 2: >>>>> >>>>>> If I understand correctly, this metric along with the pending mesages >>> / >>>>>> bytes would answer the questions of: >>>>> >>>>>> - Does the connector consume fast enough? Lagging behind + empty >>> buffer >>>>> = >>>>>> cannot consume fast enough. >>>>>> - Does the connector emit fast enough? Lagging behind + full buffer = >>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>>> >>>>> Yes, exactly. This can also be used to support decisions like changing >>> the >>>>> parallelism of the sources and/or down stream operators. >>>>> >>>>> I’m not sure if I understand your proposal with time based >>> measurements. >>>>> Maybe I’m missing something, but I do not see how measuring time alone >>>>> could answer the problem: where is the bottleneck. Time spent on the >>>>> next/emit might be short or long (depending on how heavy to process the >>>>> record is) and the source can still be bottlenecked/back pressured or >>> not. >>>>> Usually the easiest and the most reliable way how to detect >>> bottlenecks is >>>>> by checking usage of input & output buffers, since when input buffer is >>>>> full while output buffer is empty, that’s the definition of a >>> bottleneck. >>>>> Also this is usually very easy and cheap to measure (it works >>> effectively >>>>> the same way as current’s Flink back pressure monitoring, but more >>> cleanly, >>>>> without probing thread’s stack traces). >>>>> >>>>> Also keep in mind that we are already using the buffer usage metrics >>> for >>>>> detecting the bottlenecks in Flink’s internal network exchanges (manual >>>>> work). That’s the reason why I wanted to extend this to sources/sinks, >>>>> since they are currently our blind spot. >>>>> >>>>>> One feature we are currently working on to scale Flink automatically >>>>> relies >>>>>> on some metrics answering the same question >>>>> >>>>> That would be very helpful feature. I think in order to achieve that we >>>>> would need to: >>>>> 1. Define the metrics that would allow us to manually detect >>> bottlenecks. >>>>> As I wrote, we already have them in most of the places, except of >>>>> sources/sinks. >>>>> 2. Use those metrics, to automatically detect bottlenecks. Currently we >>>>> are only automatically detecting back pressure and reporting it to the >>> user >>>>> in web UI (is it exposed as a metric at all?). Detecting the root >>> cause of >>>>> the back pressure (bottleneck) is one step further. >>>>> 3. Use the knowledge about where exactly the bottleneck is located, to >>> try >>>>> to do something with it. >>>>> >>>>> I think you are aiming for point 3., but before we reach it, we are >>> still >>>>> missing 1. & 2. Also even if we have 3., there is a value in 1 & 2 for >>>>> manual analysis/dashboards. >>>>> >>>>> However, having the knowledge of where the bottleneck is, doesn’t >>>>> necessarily mean that we know what we can do about it. For example >>>>> increasing parallelism might or might not help with anything (data >>> skew, >>>>> bottleneck on some resource that does not scale), but this remark >>> applies >>>>> always, regardless of the way how did we detect the bottleneck. >>>>> >>>>> Piotrek >>>>> >>>>>> On 3 Jun 2019, at 06:16, Becket Qin <becket....@gmail.com> wrote: >>>>>> >>>>>> Hi Piotr, >>>>>> >>>>>> Thanks for the suggestion. Some thoughts below: >>>>>> >>>>>> Re 1: The pending messages / bytes. >>>>>> I completely agree these are very useful metrics and we should expect >>> the >>>>>> connector to report. WRT the way to expose them, it seems more >>> consistent >>>>>> to add two metrics instead of adding a method (unless there are other >>> use >>>>>> cases other than metric reporting). So we can add the following two >>>>> metrics. >>>>>> - pending.bytes, Gauge >>>>>> - pending.messages, Gauge >>>>>> Applicable connectors can choose to report them. These two metrics >>> along >>>>>> with latency should be sufficient for users to understand the progress >>>>> of a >>>>>> connector. >>>>>> >>>>>> >>>>>> Re 2: Number of buffered data in-memory of the connector >>>>>> If I understand correctly, this metric along with the pending mesages >>> / >>>>>> bytes would answer the questions of: >>>>>> - Does the connector consume fast enough? Lagging behind + empty >>> buffer >>>>> = >>>>>> cannot consume fast enough. >>>>>> - Does the connector emit fast enough? Lagging behind + full buffer = >>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>>>> >>>>>> One feature we are currently working on to scale Flink automatically >>>>> relies >>>>>> on some metrics answering the same question, more specifically, we are >>>>>> profiling the time spent on .next() method (time to consume) and the >>> time >>>>>> spent on .collect() method (time to emit / process). One advantage of >>>>> such >>>>>> method level time cost allows us to calculate the parallelism >>> required to >>>>>> keep up in case their is a lag. >>>>>> >>>>>> However, one concern I have regarding such metric is that they are >>>>>> implementation specific. Either profiling on the method time, or >>>>> reporting >>>>>> buffer usage assumes the connector are implemented in such a way. A >>>>>> slightly better solution might be have the following metric: >>>>>> >>>>>> - EmitTimeRatio (or FetchTimeRatio): The time spent on emitting >>>>>> records / Total time elapsed. >>>>>> >>>>>> This assumes that the source connectors have to emit the records to >>> the >>>>>> downstream at some point. The emission may take some time ( e.g. go >>>>> through >>>>>> chained operators). And the rest of the time are spent to prepare the >>>>>> record to emit, including time for consuming and format conversion, >>> etc. >>>>>> Ideally, we'd like to see the time spent on record fetch and emit to >>> be >>>>>> about the same, so no one is bottleneck for the other. >>>>>> >>>>>> The downside of these time based metrics is additional overhead to get >>>>> the >>>>>> time, therefore sampling might be needed. But in practice I feel such >>>>> time >>>>>> based metric might be more useful if we want to take action. >>>>>> >>>>>> >>>>>> I think we should absolutely add metrics in (1) to the metric >>> convention. >>>>>> We could also add the metrics mentioned in (2) if we reach consensus >>> on >>>>>> that. What do you think? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> >>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski <pi...@ververica.com> >>>>> wrote: >>>>>> >>>>>>> Hey Becket, >>>>>>> >>>>>>> Re 1a) and 1b) +1 from my side. >>>>>>> >>>>>>> I’ve discussed this issue: >>>>>>> >>>>>>>>>> >>>>>>>>>> 2. It would be nice to have metrics, that allow us to check the >>> cause >>>>>>> of >>>>>>>>>> back pressure: >>>>>>>>>> a) for sources, length of input queue (in bytes? Or boolean >>>>>>>>>> hasSomethingl/isEmpty) >>>>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean >>>>>>>>>> hasSomething/isEmpty) >>>>>>> >>>>>>> With Nico at some lengths and he also saw the benefits of them. We >>> also >>>>>>> have more concrete proposal for that. >>>>>>> >>>>>>> Actually there are two really useful metrics, that we are missing >>>>>>> currently: >>>>>>> >>>>>>> 1. Number of data/records/bytes in the backlog to process. For >>> example >>>>>>> remaining number of bytes in unread files. Or pending data in Kafka >>>>> topics. >>>>>>> 2. Number of buffered data in-memory of the connector, that are >>> waiting >>>>> to >>>>>>> be processed pushed to Flink pipeline. >>>>>>> >>>>>>> Re 1: >>>>>>> This would have to be a metric provided directly by a connector. It >>>>> could >>>>>>> be an undefined `int`: >>>>>>> >>>>>>> `int backlog` - estimate of pending work. >>>>>>> >>>>>>> “Undefined” meaning that it would be up to a connector to decided >>>>> whether >>>>>>> it’s measured in bytes, records, pending files or whatever it is >>>>> possible >>>>>>> to provide by the connector. This is because I assume not every >>>>> connector >>>>>>> can provide exact number and for some of them it might be impossible >>> to >>>>>>> provide records number of bytes count. >>>>>>> >>>>>>> Re 2: >>>>>>> This metric could be either provided by a connector, or calculated >>>>> crudely >>>>>>> by Flink: >>>>>>> >>>>>>> `float bufferUsage` - value from [0.0, 1.0] range >>>>>>> >>>>>>> Percentage of used in memory buffers, like in Kafka’s handover. >>>>>>> >>>>>>> It could be crudely implemented by Flink with FLIP-27 >>>>>>> SourceReader#isAvailable. If SourceReader is not available reported >>>>>>> `bufferUsage` could be 0.0. If it is available, it could be 1.0. I >>> think >>>>>>> this would be a good enough estimation for most of the use cases >>> (that >>>>>>> could be overloaded and implemented better if desired). Especially >>>>> since we >>>>>>> are reporting only probed values: if probed values are almost always >>>>> “1.0”, >>>>>>> it would mean that we have a back pressure. If they are almost always >>>>>>> “0.0”, there is probably no back pressure at the sources. >>>>>>> >>>>>>> What do you think about this? >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> On 30 May 2019, at 11:41, Becket Qin <becket....@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Thanks a lot for all the feedback and comments. I'd like to continue >>>>> the >>>>>>>> discussion on this FLIP. >>>>>>>> >>>>>>>> I updated the FLIP-33 wiki to remove all the histogram metrics from >>> the >>>>>>>> first version of metric convention due to the performance concern. >>> The >>>>>>> plan >>>>>>>> is to introduce them later when we have a mechanism to opt in/out >>>>>>> metrics. >>>>>>>> At that point, users can decide whether they want to pay the cost to >>>>> get >>>>>>>> the metric or not. >>>>>>>> >>>>>>>> As Stephan suggested, for this FLIP, let's first try to agree on the >>>>>>> small >>>>>>>> list of conventional metrics that connectors should follow. >>>>>>>> Just to be clear, the purpose of the convention is not to enforce >>> every >>>>>>>> connector to report all these metrics, but to provide a guidance in >>>>> case >>>>>>>> these metrics are reported by some connectors. >>>>>>>> >>>>>>>> >>>>>>>> @ Stephan & Chesnay, >>>>>>>> >>>>>>>> Regarding the duplication of `RecordsIn` metric in operator / task >>>>>>>> IOMetricGroups, from what I understand, for source operator, it is >>>>>>> actually >>>>>>>> the SourceFunction that reports the operator level >>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are essentially the >>> same >>>>>>>> metric in the operator level IOMetricGroup. Similarly for the Sink >>>>>>>> operator, the operator level RecordsOut/RecordsOutPerSecond metrics >>> are >>>>>>>> also reported by the Sink function. I marked them as existing in the >>>>>>>> FLIP-33 wiki page. Please let me know if I misunderstood. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi Piotr, >>>>>>>>> >>>>>>>>> Thanks a lot for the feedback. >>>>>>>>> >>>>>>>>> 1a) I guess you are referring to the part that "original system >>>>> specific >>>>>>>>> metrics should also be reported". The performance impact depends on >>>>> the >>>>>>>>> implementation. An efficient implementation would only record the >>>>> metric >>>>>>>>> once, but report them with two different metric names. This is >>>>> unlikely >>>>>>> to >>>>>>>>> hurt performance. >>>>>>>>> >>>>>>>>> 1b) Yes, I agree that we should avoid adding overhead to the >>> critical >>>>>>> path >>>>>>>>> by all means. This is sometimes a tradeoff, running blindly without >>>>> any >>>>>>>>> metric gives best performance, but sometimes might be frustrating >>> when >>>>>>> we >>>>>>>>> debug some issues. >>>>>>>>> >>>>>>>>> 2. The metrics are indeed very useful. Are they supposed to be >>>>> reported >>>>>>> by >>>>>>>>> the connectors or Flink itself? At this point FLIP-33 is more >>> focused >>>>> on >>>>>>>>> provide a guidance to the connector authors on the metrics >>> reporting. >>>>>>> That >>>>>>>>> said, after FLIP-27, I think we should absolutely report these >>> metrics >>>>>>> in >>>>>>>>> the abstract implementation. In any case, the metric convention in >>>>> this >>>>>>>>> list are expected to evolve over time. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski < >>> pi...@ververica.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks for the proposal and driving the effort here Becket :) I’ve >>>>> read >>>>>>>>>> through the FLIP-33 [1], and here are couple of my thoughts. >>>>>>>>>> >>>>>>>>>> Big +1 for standardising the metric names between connectors, it >>> will >>>>>>>>>> definitely help us and users a lot. >>>>>>>>>> >>>>>>>>>> Issues/questions/things to discuss that I’ve thought of: >>>>>>>>>> >>>>>>>>>> 1a. If we are about to duplicate some metrics, can this become a >>>>>>>>>> performance issue? >>>>>>>>>> 1b. Generally speaking, we should make sure that collecting those >>>>>>> metrics >>>>>>>>>> is as non intrusive as possible, especially that they will need >>> to be >>>>>>>>>> updated once per record. (They might be collected more rarely with >>>>> some >>>>>>>>>> overhead, but the hot path of updating it per record will need to >>> be >>>>> as >>>>>>>>>> quick as possible). That includes both avoiding heavy computation >>> on >>>>>>> per >>>>>>>>>> record path: histograms?, measuring time for time based metrics >>> (per >>>>>>>>>> second) (System.currentTimeMillis() depending on the >>> implementation >>>>> can >>>>>>>>>> invoke a system call) >>>>>>>>>> >>>>>>>>>> 2. It would be nice to have metrics, that allow us to check the >>> cause >>>>>>> of >>>>>>>>>> back pressure: >>>>>>>>>> a) for sources, length of input queue (in bytes? Or boolean >>>>>>>>>> hasSomethingl/isEmpty) >>>>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean >>>>>>>>>> hasSomething/isEmpty) >>>>>>>>>> >>>>>>>>>> a) is useful in a scenario when we are processing backlog of >>> records, >>>>>>> all >>>>>>>>>> of the internal Flink’s input/output network buffers are empty, >>> and >>>>> we >>>>>>> want >>>>>>>>>> to check whether the external source system is the bottleneck >>>>> (source’s >>>>>>>>>> input queue will be empty), or if the Flink’s connector is the >>>>>>> bottleneck >>>>>>>>>> (source’s input queues will be full). >>>>>>>>>> b) similar story. Backlog of records, but this time all of the >>>>> internal >>>>>>>>>> Flink’s input/ouput network buffers are full, and we want o check >>>>>>> whether >>>>>>>>>> the external sink system is the bottleneck (sink output queues are >>>>>>> full), >>>>>>>>>> or if the Flink’s connector is the bottleneck (sink’s output >>> queues >>>>>>> will be >>>>>>>>>> empty) >>>>>>>>>> >>>>>>>>>> It might be sometimes difficult to provide those metrics, so they >>>>> could >>>>>>>>>> be optional, but if we could provide them, it would be really >>>>> helpful. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> >>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>>>>>> < >>>>>>>>>> >>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen <se...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>> I think this sounds reasonable. >>>>>>>>>>> >>>>>>>>>>> Let's keep the "reconfiguration without stopping the job" out of >>>>> this, >>>>>>>>>>> because that would be a super big effort and if we approach that, >>>>> then >>>>>>>>>> in >>>>>>>>>>> more generic way rather than specific to connector metrics. >>>>>>>>>>> >>>>>>>>>>> I would suggest to look at the following things before starting >>> with >>>>>>> any >>>>>>>>>>> implementation work: >>>>>>>>>>> >>>>>>>>>>> - Try and find a committer to support this, otherwise it will be >>>>> hard >>>>>>>>>> to >>>>>>>>>>> make progress >>>>>>>>>>> - Start with defining a smaller set of "core metrics" and extend >>> the >>>>>>>>>> set >>>>>>>>>>> later. I think that is easier than now blocking on reaching >>>>> consensus >>>>>>>>>> on a >>>>>>>>>>> large group of metrics. >>>>>>>>>>> - Find a solution to the problem Chesnay mentioned, that the >>>>> "records >>>>>>>>>> in" >>>>>>>>>>> metric is somehow overloaded and exists already in the IO Metric >>>>>>> group. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin <becket....@gmail.com >>>> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Stephan, >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot for the feedback. All makes sense. >>>>>>>>>>>> >>>>>>>>>>>> It is a good suggestion to simply have an onRecord(numBytes, >>>>>>> eventTime) >>>>>>>>>>>> method for connector writers. It should meet most of the >>>>>>> requirements, >>>>>>>>>>>> individual >>>>>>>>>>>> >>>>>>>>>>>> The configurable metrics feature is something really useful, >>>>>>>>>> especially if >>>>>>>>>>>> we can somehow make it dynamically configurable without stopping >>>>> the >>>>>>>>>> jobs. >>>>>>>>>>>> It might be better to make it a separate discussion because it >>> is a >>>>>>>>>> more >>>>>>>>>>>> generic feature instead of only for connectors. >>>>>>>>>>>> >>>>>>>>>>>> So in order to make some progress, in this FLIP we can limit the >>>>>>>>>> discussion >>>>>>>>>>>> scope to the connector related items: >>>>>>>>>>>> >>>>>>>>>>>> - the standard connector metric names and types. >>>>>>>>>>>> - the abstract ConnectorMetricHandler interface >>>>>>>>>>>> >>>>>>>>>>>> I'll start a separate thread to discuss other general metric >>>>> related >>>>>>>>>>>> enhancement items including: >>>>>>>>>>>> >>>>>>>>>>>> - optional metrics >>>>>>>>>>>> - dynamic metric configuration >>>>>>>>>>>> - potential combination with rate limiter >>>>>>>>>>>> >>>>>>>>>>>> Does this plan sound reasonable? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ignoring for a moment implementation details, this connector >>>>> metrics >>>>>>>>>> work >>>>>>>>>>>>> is a really good thing to do, in my opinion >>>>>>>>>>>>> >>>>>>>>>>>>> The questions "oh, my job seems to be doing nothing, I am >>> looking >>>>> at >>>>>>>>>> the >>>>>>>>>>>> UI >>>>>>>>>>>>> and the 'records in' value is still zero" is in the top three >>>>>>> support >>>>>>>>>>>>> questions I have been asked personally. >>>>>>>>>>>>> Introspection into "how far is the consumer lagging behind" >>> (event >>>>>>>>>> time >>>>>>>>>>>>> fetch latency) came up many times as well. >>>>>>>>>>>>> >>>>>>>>>>>>> So big +1 to solving this problem. >>>>>>>>>>>>> >>>>>>>>>>>>> About the exact design - I would try to go for the following >>>>>>>>>> properties: >>>>>>>>>>>>> >>>>>>>>>>>>> - keep complexity of of connectors. Ideally the metrics handler >>>>> has >>>>>>> a >>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or so, and >>> everything >>>>>>>>>> else is >>>>>>>>>>>>> internal to the handler. That makes it dead simple for the >>>>>>> connector. >>>>>>>>>> We >>>>>>>>>>>>> can also think of an extensive scheme for connector specific >>>>>>> metrics. >>>>>>>>>>>>> >>>>>>>>>>>>> - make it configurable on the job it cluster level which >>> metrics >>>>> the >>>>>>>>>>>>> handler internally creates when that method is invoked. >>>>>>>>>>>>> >>>>>>>>>>>>> What do you think? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Stephan >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler < >>>>>>> ches...@apache.org >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> As I said before, I believe this to be over-engineered and >>> have >>>>> no >>>>>>>>>>>>>> interest in this implementation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There are conceptual issues like defining a duplicate >>>>>>>>>>>> numBytesIn(PerSec) >>>>>>>>>>>>>> metric that already exists for each operator. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote: >>>>>>>>>>>>>>> A few updates to the thread. I uploaded a patch[1] as a >>> complete >>>>>>>>>>>>>>> example of how users can use the metrics in the future. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Some thoughts below after taking a look at the >>>>> AbstractMetricGroup >>>>>>>>>>>> and >>>>>>>>>>>>>>> its subclasses. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This patch intends to provide convenience for Flink connector >>>>>>>>>>>>>>> implementations to follow metrics standards proposed in >>> FLIP-33. >>>>>>> It >>>>>>>>>>>>>>> also try to enhance the metric management in general way to >>> help >>>>>>>>>>>> users >>>>>>>>>>>>>>> with: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. metric definition >>>>>>>>>>>>>>> 2. metric dependencies check >>>>>>>>>>>>>>> 3. metric validation >>>>>>>>>>>>>>> 4. metric control (turn on / off particular metrics) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the functionality of >>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The >>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the metric group >>>>> hierarchy, >>>>>>>>>> but >>>>>>>>>>>>>>> does not really manage the metrics other than keeping them >>> in a >>>>>>> Map. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ideally we should only have one entry point for the metrics. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Right now the entry point is |AbstractMetricGroup|. However, >>>>>>> besides >>>>>>>>>>>>>>> the missing functionality mentioned above, >>> |AbstractMetricGroup| >>>>>>>>>>>> seems >>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could extract it out to >>>>>>>>>>>>>>> flink-metrics in order to use it for generic purpose. There >>> will >>>>>>> be >>>>>>>>>>>>>>> some work, though. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics| in this patch >>> as >>>>> the >>>>>>>>>>>>>>> metric entry point. It wraps metric group and provides the >>>>> missing >>>>>>>>>>>>>>> functionalities. Then we can roll out this pattern to runtime >>>>>>>>>>>>>>> components gradually as well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My first thought is that the latter approach gives a more >>> smooth >>>>>>>>>>>>>>> migration. But I am also OK with doing a refactoring on the >>>>>>>>>>>>>>> |AbstractMetricGroup| family. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin < >>>>> becket....@gmail.com >>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Chesnay, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It might be easier to discuss some implementation details in >>>>> the >>>>>>>>>>>>>>> PR review instead of in the FLIP discussion thread. I have a >>>>>>>>>>>> patch >>>>>>>>>>>>>>> for Kafka connectors ready but haven't submitted the PR yet. >>>>>>>>>>>>>>> Hopefully that will help explain a bit more. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ** Re: metric type binding >>>>>>>>>>>>>>> This is a valid point that worths discussing. If I understand >>>>>>>>>>>>>>> correctly, there are two points: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Metric type / interface does not matter as long as the >>>>> metric >>>>>>>>>>>>>>> semantic is clearly defined. >>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as the metric >>>>>>>>>>>> semantic >>>>>>>>>>>>>>> is defined, metric type does not matter. To some extent, >>> Gauge >>>>> / >>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be think of as >>> some >>>>>>>>>>>>>>> well-recognized semantics, if you wish. In Flink, these >>> metric >>>>>>>>>>>>>>> semantics have their associated interface classes. In >>> practice, >>>>>>>>>>>>>>> such semantic to interface binding seems necessary for >>>>> different >>>>>>>>>>>>>>> components to communicate. Simply standardize the semantic >>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>> connector metrics seems not sufficient for people to build >>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we still need to >>>>>>> have >>>>>>>>>>>>>>> some embodiment of the metric semantics that people can >>> program >>>>>>>>>>>>>>> against. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be exposed using >>>>>>>>>>>>>>> different metric types / interfaces. >>>>>>>>>>>>>>> This is a good point. Counter and Gauge-as-a-Counter are >>> pretty >>>>>>>>>>>>>>> much interchangeable. This is more of a trade-off between the >>>>>>>>>>>> user >>>>>>>>>>>>>>> experience of metric producers and consumers. The metric >>>>>>>>>>>> producers >>>>>>>>>>>>>>> want to use Counter or Gauge depending on whether the counter >>>>> is >>>>>>>>>>>>>>> already tracked in code, while ideally the metric consumers >>>>> only >>>>>>>>>>>>>>> want to see a single metric type for each metric. I am >>> leaning >>>>>>>>>>>>>>> towards to make the metric producers happy, i.e. allow Gauge >>> / >>>>>>>>>>>>>>> Counter metric type, and the the metric consumers handle the >>>>>>> type >>>>>>>>>>>>>>> variation. The reason is that in practice, there might be >>> more >>>>>>>>>>>>>>> connector implementations than metric reporter >>> implementations. >>>>>>>>>>>> We >>>>>>>>>>>>>>> could also provide some helper method to facilitate reading >>>>> from >>>>>>>>>>>>>>> such variable metric type. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just some quick replies to the comments around implementation >>>>>>>>>>>>>> details. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 4) single place where metrics are registered except >>>>>>>>>>>>>>> connector-specific >>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Register connector specific ones in a single place is >>> actually >>>>>>>>>>>>>>> something that I want to achieve. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) I'm talking about time-series databases like >>> Prometheus. >>>>>>>>>>>> We >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>>> fetchTime/emitTime >>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), where a >>>>> user >>>>>>>>>>>>>>> could build >>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Not sure if such downsampling works. As an example, if a user >>>>>>>>>>>>>>> complains that there are some intermittent latency spikes >>>>> (maybe >>>>>>>>>>>> a >>>>>>>>>>>>>>> few records in 10 seconds) in their processing system. >>> Having a >>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems unlikely useful. >>>>>>>>>>>>>>> However by looking at actual 99.9 percentile latency might >>>>> help. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler >>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Re: over complication of implementation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think I get understand better know what you're shooting >>>>>>>>>>>> for, >>>>>>>>>>>>>>> effectively something like the OperatorIOMetricGroup. >>>>>>>>>>>>>>> But still, re-define setupConnectorMetrics() to accept a >>>>> set >>>>>>>>>>>>>>> of flags >>>>>>>>>>>>>>> for counters/meters(ans _possibly_ histograms) along >>> with a >>>>>>>>>>>>>>> set of >>>>>>>>>>>>>>> well-defined Optional<Gauge<?>>, and return the group. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Solves all issues as far as i can tell: >>>>>>>>>>>>>>> 1) no metrics must be created manually (except Gauges, >>>>> which >>>>>>>>>>>>> are >>>>>>>>>>>>>>> effectively just Suppliers and you can't get around >>> this), >>>>>>>>>>>>>>> 2) additional metrics can be registered on the returned >>>>>>>>>>>> group, >>>>>>>>>>>>>>> 3) see 1), >>>>>>>>>>>>>>> 4) single place where metrics are registered except >>>>>>>>>>>>>>> connector-specific >>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Re: Histogram >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) As an example, whether "numRecordsIn" is exposed as a >>>>>>>>>>>>>>> Counter or a >>>>>>>>>>>>>>> Gauge should be irrelevant. So far we're using the metric >>>>>>>>>>>> type >>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>> the most convenient at exposing a given value. If there >>> is >>>>>>>>>>>>>>> some backing >>>>>>>>>>>>>>> data-structure that we want to expose some data from we >>>>>>>>>>>>>>> typically opt >>>>>>>>>>>>>>> for a Gauge, as otherwise we're just mucking around with >>>>> the >>>>>>>>>>>>>>> Meter/Counter API to get it to match. Similarly, if we >>> want >>>>>>>>>>>> to >>>>>>>>>>>>>>> count >>>>>>>>>>>>>>> something but no current count exists we typically added >>> a >>>>>>>>>>>>>>> Counter. >>>>>>>>>>>>>>> That's why attaching semantics to metric types makes >>> little >>>>>>>>>>>>>>> sense (but >>>>>>>>>>>>>>> unfortunately several reporters already do it); for >>>>>>>>>>>>>>> counters/meters >>>>>>>>>>>>>>> certainly, but the majority of metrics are gauges. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) I'm talking about time-series databases like >>> Prometheus. >>>>>>>>>>>> We >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>>> fetchTime/emitTime >>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), where a >>>>> user >>>>>>>>>>>>>>> could build >>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 22.02.2019 13:57, Becket Qin wrote: >>>>>>>>>>>>>>>> Hi Chesnay, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the explanation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ** Re: FLIP >>>>>>>>>>>>>>>> I might have misunderstood this, but it seems that "major >>>>>>>>>>>>>>> changes" are well >>>>>>>>>>>>>>>> defined in FLIP. The full contents is following: >>>>>>>>>>>>>>>> What is considered a "major change" that needs a FLIP? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any of the following should be considered a major change: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of >>>>>>>>>>>>>>> functionality >>>>>>>>>>>>>>>> - *Any change that impacts the public interfaces of the >>>>>>>>>>>>>>> project* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What are the "public interfaces" of the project? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *All of the following are public interfaces *that people >>>>>>>>>>>>>>> build around: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - DataStream and DataSet API, including classes related >>>>>>>>>>>>>>> to that, such as >>>>>>>>>>>>>>>> StreamExecutionEnvironment >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Classes marked with the @Public annotation >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - On-disk binary formats, such as >>>>>>>>>>>> checkpoints/savepoints >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e. >>>>>>>>>>>>>>> bin/flink, Yarn scripts, >>>>>>>>>>>>>>>> Mesos scripts >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Configuration settings >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - *Exposed monitoring information* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So any monitoring information change is considered as >>>>>>>>>>>> public >>>>>>>>>>>>>>> interface, and >>>>>>>>>>>>>>>> any public interface change is considered as a "major >>>>>>>>>>>>> change". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ** Re: over complication of implementation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Although this is more of implementation details that is not >>>>>>>>>>>>>>> covered by the >>>>>>>>>>>>>>>> FLIP. But it may be worth discussing. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> First of all, I completely agree that we should use the >>>>>>>>>>>>>>> simplest way to >>>>>>>>>>>>>>>> achieve our goal. To me the goal is the following: >>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces. >>>>>>>>>>>>>>>> 2. The easiness of creating a connector. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Both of them are important to the prosperity of the >>>>>>>>>>>>>>> connector ecosystem. So >>>>>>>>>>>>>>>> I'd rather abstract as much as possible on our side to make >>>>>>>>>>>>>>> the connector >>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a static util >>>>>>>>>>>>>>> method approach >>>>>>>>>>>>>>>> might have a few drawbacks: >>>>>>>>>>>>>>>> 1. Users still have to construct the metrics by themselves. >>>>>>>>>>>>>>> (And note that >>>>>>>>>>>>>>>> this might be erroneous by itself. For example, a customer >>>>>>>>>>>>>>> wrapper around >>>>>>>>>>>>>>>> dropwizard meter maybe used instead of MeterView). >>>>>>>>>>>>>>>> 2. When connector specific metrics are added, it is >>>>>>>>>>>>>>> difficult to enforce >>>>>>>>>>>>>>>> the scope to be the same as standard metrics. >>>>>>>>>>>>>>>> 3. It seems that a method proliferation is inevitable if we >>>>>>>>>>>>>>> want to apply >>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn was not >>>>>>>>>>>>>>> registered for a meter. >>>>>>>>>>>>>>>> 4. Metrics are still defined in random places and hard to >>>>>>>>>>>>>> track. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The current PR I had was inspired by the Config system in >>>>>>>>>>>>>>> Kafka, which I >>>>>>>>>>>>>>>> found pretty handy. In fact it is not only used by Kafka >>>>>>>>>>>>>>> itself but even >>>>>>>>>>>>>>>> some other projects that depend on Kafka. I am not saying >>>>>>>>>>>>>>> this approach is >>>>>>>>>>>>>>>> perfect. But I think it worths to save the work for >>>>>>>>>>>>>>> connector writers and >>>>>>>>>>>>>>>> encourage more systematic implementation. That being said, >>>>>>>>>>>> I >>>>>>>>>>>>>>> am fully open >>>>>>>>>>>>>>>> to suggestions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Re: Histogram >>>>>>>>>>>>>>>> I think there are two orthogonal questions around those >>>>>>>>>>>>>> metrics: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just looking at the >>>>>>>>>>>>>>> meaning of a >>>>>>>>>>>>>>>> metric, is generic to all connectors? If the answer is yes, >>>>>>>>>>>>>>> we should >>>>>>>>>>>>>>>> include the metric into the convention. No matter whether >>>>>>>>>>>> we >>>>>>>>>>>>>>> include it >>>>>>>>>>>>>>>> into the convention or not, some connector implementations >>>>>>>>>>>>>>> will emit such >>>>>>>>>>>>>>>> metric. It is better to have a convention than letting each >>>>>>>>>>>>>>> connector do >>>>>>>>>>>>>>>> random things. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what should we do? >>>>>>>>>>>>>>>> I agree that we should make it clear that using histograms >>>>>>>>>>>>>>> will have >>>>>>>>>>>>>>>> performance risk. But I do see histogram is useful in some >>>>>>>>>>>>>>> fine-granularity >>>>>>>>>>>>>>>> debugging where one do not have the luxury to stop the >>>>>>>>>>>>>>> system and inject >>>>>>>>>>>>>>>> more inspection code. So the workaround I am thinking is to >>>>>>>>>>>>>>> provide some >>>>>>>>>>>>>>>> implementation suggestions. Assume later on we have a >>>>>>>>>>>>>>> mechanism of >>>>>>>>>>>>>>>> selective metrics. In the abstract metrics class we can >>>>>>>>>>>>>>> disable those >>>>>>>>>>>>>>>> metrics by default individual connector writers does not >>>>>>>>>>>>>>> have to do >>>>>>>>>>>>>>>> anything (this is another advantage of having an >>>>>>>>>>>>>>> AbstractMetrics instead of >>>>>>>>>>>>>>>> static util methods.) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am not sure I fully understand the histogram in the >>>>>>>>>>>>>>> backend approach. Can >>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting the raw data, >>>>>>>>>>>>>>> e.g. fetchTime >>>>>>>>>>>>>>>> and emitTime with each record and let the histogram >>>>>>>>>>>>>>> computation happen in >>>>>>>>>>>>>>>> the background? Or let the processing thread putting the >>>>>>>>>>>>>>> values into a >>>>>>>>>>>>>>>> queue and have a separate thread polling from the queue and >>>>>>>>>>>>>>> add them into >>>>>>>>>>>>>>>> the histogram? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay Schepler >>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Re: Flip >>>>>>>>>>>>>>>>> The very first line under both the main header and Purpose >>>>>>>>>>>>>>> section >>>>>>>>>>>>>>>>> describe Flips as "major changes", which this isn't. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Re: complication >>>>>>>>>>>>>>>>> I'm not arguing against standardization, but again an >>>>>>>>>>>>>>> over-complicated >>>>>>>>>>>>>>>>> implementation when a static utility method would be >>>>>>>>>>>>>>> sufficient. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public static void setupConnectorMetrics( >>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup, >>>>>>>>>>>>>>>>> String connectorName, >>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn, >>>>>>>>>>>>>>>>> ...) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This gives you all you need: >>>>>>>>>>>>>>>>> * a well-defined set of metrics for a connector to opt-in >>>>>>>>>>>>>>>>> * standardized naming schemes for scope and individual >>>>>>>>>>>>> metrics >>>>>>>>>>>>>>>>> * standardize metric types (although personally I'm not >>>>>>>>>>>>>>> interested in that >>>>>>>>>>>>>>>>> since metric types should be considered syntactic sugar) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Re: Configurable Histogram >>>>>>>>>>>>>>>>> If anything they _must_ be turned off by default, but the >>>>>>>>>>>>>>> metric system is >>>>>>>>>>>>>>>>> already exposing so many options that I'm not too keen on >>>>>>>>>>>>>>> adding even more. >>>>>>>>>>>>>>>>> You have also only addressed my first argument against >>>>>>>>>>>>>>> histograms >>>>>>>>>>>>>>>>> (performance), the second one still stands (calculate >>>>>>>>>>>>>>> histogram in metric >>>>>>>>>>>>>>>>> backends instead). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote: >>>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is worthy of a FLIP >>>>>>>>>>>>>>> because it is >>>>>>>>>>>>>>>>>> public API. According to the FLIP description a FlIP is >>>>>>>>>>>>>>> required in case >>>>>>>>>>>>>>>>> of: >>>>>>>>>>>>>>>>>> - Any change that impacts the public interfaces of >>>>>>>>>>>>>>> the project >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and the following entry is found in the definition of >>>>>>>>>>>>>>> "public interface". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Exposed monitoring information >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Metrics are critical to any production system. So a clear >>>>>>>>>>>>>>> metric >>>>>>>>>>>>>>>>> definition >>>>>>>>>>>>>>>>>> is important for any serious users. For an organization >>>>>>>>>>>>>>> with large Flink >>>>>>>>>>>>>>>>>> installation, change in metrics means great amount of >>>>>>>>>>>>>>> work. So such >>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>> do need to be fully discussed and documented. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ** Re: Histogram. >>>>>>>>>>>>>>>>>> We can discuss whether there is a better way to expose >>>>>>>>>>>>>>> metrics that are >>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark on various >>>>>>>>>>>>>>> histogram >>>>>>>>>>>>>>>>>> implementations also indicates that they are >>>>>>>>>>>> significantly >>>>>>>>>>>>>>> slower than >>>>>>>>>>>>>>>>>> other metric types. But I don't think that means never >>>>>>>>>>>> use >>>>>>>>>>>>>>> histogram, but >>>>>>>>>>>>>>>>>> means use it with caution. For example, we can suggest >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> implementations >>>>>>>>>>>>>>>>>> to turn them off by default and only turn it on for a >>>>>>>>>>>>>>> small amount of >>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>> when performing some micro-debugging. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ** Re: complication: >>>>>>>>>>>>>>>>>> Connector conventions are essential for Flink ecosystem. >>>>>>>>>>>>>>> Flink connectors >>>>>>>>>>>>>>>>>> pool is probably the most important part of Flink, just >>>>>>>>>>>>>>> like any other >>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>> system. Clear conventions of connectors will help build >>>>>>>>>>>>>>> Flink ecosystem >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> a more organic way. >>>>>>>>>>>>>>>>>> Take the metrics convention as an example, imagine >>>>>>>>>>>> someone >>>>>>>>>>>>>>> has developed >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> Flink connector for System foo, and another developer may >>>>>>>>>>>>>>> have developed >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for Flink which >>>>>>>>>>>>>>> analyzes the Flink >>>>>>>>>>>>>>>>> job >>>>>>>>>>>>>>>>>> performance based on metrics. With a clear metric >>>>>>>>>>>>>>> convention, those two >>>>>>>>>>>>>>>>>> projects could be developed independently. Once users put >>>>>>>>>>>>>>> them together, >>>>>>>>>>>>>>>>>> it would work without additional modifications. This >>>>>>>>>>>>>>> cannot be easily >>>>>>>>>>>>>>>>>> achieved by just defining a few constants. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ** Re: selective metrics: >>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate thread. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Dawid >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency >>>>>>>>>>>>>>>>>> The primary purpose of establish such a convention is to >>>>>>>>>>>>>>> help developers >>>>>>>>>>>>>>>>>> write connectors in a more compatible way. The convention >>>>>>>>>>>>>>> is supposed to >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> defined more proactively. So when look at the convention, >>>>>>>>>>>>>>> it seems more >>>>>>>>>>>>>>>>>> important to see if the concept is applicable to >>>>>>>>>>>>>>> connectors in general. >>>>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>> might be true so far only Kafka connector reports >>>>>>>>>>>> latency. >>>>>>>>>>>>>>> But there >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>> be hundreds of other connector implementations in the >>>>>>>>>>>>>>> Flink ecosystem, >>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of them also emits >>>>>>>>>>>>>>> latency. I >>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> a lot of other sources actually also has an append >>>>>>>>>>>>>>> timestamp. e.g. >>>>>>>>>>>>>>>>> database >>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't be surprised >>>>>>>>>>>>>>> if some database >>>>>>>>>>>>>>>>>> connector can also emit latency metrics. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay Schepler >>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to investigate this >>>>>>>>>>>> as >>>>>>>>>>>>>>> part of this >>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of interest for >>>>>>>>>>>> the >>>>>>>>>>>>>>> entire metric >>>>>>>>>>>>>>>>>>> system, and should be designed for as such. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the core of >>>>>>>>>>>>>>> Flink. They are >>>>>>>>>>>>>>>>>>> significantly more expensive than other metrics, and >>>>>>>>>>>>>>> calculating >>>>>>>>>>>>>>>>>>> histograms in the application is regarded as an >>>>>>>>>>>>>>> anti-pattern by several >>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to expose the raw >>>>>>>>>>>>>>> data and >>>>>>>>>>>>>>>>>>> calculate the histogram in the backend. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Second, this seems overly complicated. Given that we >>>>>>>>>>>>>>> already established >>>>>>>>>>>>>>>>>>> that not all connectors will export all metrics we are >>>>>>>>>>>>>>> effectively >>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming scheme. We >>>>>>>>>>>>>>> don't need anything >>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a few constants >>>>>>>>>>>>>>> that all >>>>>>>>>>>>>>>>>>> connectors use. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a FLIP. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote: >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. But those >>>>>>>>>>>>>>> particular metrics >>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would only be >>>>>>>>>>>>>>> reported if user uses >>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal timestampAssigner with >>>>>>>>>>>>>>> StreamCharacteristic >>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like a very >>>>>>>>>>>>>>> specific case. I am >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic metric that will >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> disabled/absent for most of implementations. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, that might >>>>>>>>>>>>>>> make sense to >>>>>>>>>>>>>>>>>>>> investigate in the future. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote: >>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense to me. There >>>>>>>>>>>>>>> are two cases >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> addressed. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a guidance. It is >>>>>>>>>>>>>>> likely that a >>>>>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>> only supports some but not all of the metrics. In that >>>>>>>>>>>>>>> case, each >>>>>>>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>> implementation should have the freedom to decide which >>>>>>>>>>>>>>> metrics are >>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are supported, the >>>>>>>>>>>>>>> guidance should be >>>>>>>>>>>>>>>>>>>>> followed. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable certain metrics >>>>>>>>>>>>>>> for some reason >>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of data). A generic >>>>>>>>>>>>>>> mechanism should >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> provided to allow user choose which metrics are >>>>>>>>>>>>>>> reported. This >>>>>>>>>>>>>>>>> mechanism >>>>>>>>>>>>>>>>>>>>> should also be honored by the connector >>>>>>>>>>>> implementations. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a unified, >>>>>>>>>>>>>>> standard set of >>>>>>>>>>>>>>>>> metrics >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight concerns about >>>>>>>>>>>>>>> fetchLatency and >>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based on EventTime >>>>>>>>>>>>>>> which is not a >>>>>>>>>>>>>>>>>>> purely >>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on some business >>>>>>>>>>>>>>> logic, might be >>>>>>>>>>>>>>>>>>> absent >>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics could also >>>>>>>>>>>>>>> behave in a weird >>>>>>>>>>>>>>>>>>> way in >>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I am not sure if >>>>>>>>>>>>>>> we should >>>>>>>>>>>>>>>>> include >>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could at least >>>>>>>>>>>>>>> introduce a feature >>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the proposed >>>>>>>>>>>>>>> metrics. I'll start a >>>>>>>>>>>>>>>>>>>>>> voting thread later toady. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket Qin >>>>>>>>>>>>>>> <becket....@gmail.com <mailto:becket....@gmail.com>> < >>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Hi folks, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP discussion thread >>>>>>>>>>>> about >>>>>>>>>>>>>>> standardize >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> connector metrics. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a convention of >>>>>>>>>>>>>>> Flink connector >>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the monitoring and >>>>>>>>>>>>>>> alerting on Flink >>>>>>>>>>>>>>>>>>> jobs. >>>>>>>>>>>>>>>>>>>>>> The FLIP link is following: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>>> >>> >>>