Hi Becket! I am wondering if it makes sense to do the following small change:
- Have "currentFetchEventTimeLag" be defined on event timestamps (optionally, if the source system exposes it) <== this is like in your proposal this helps understand how long the records were in the source before - BUT change "currentEmitEventTimeLag" to be "currentSourceWatermarkLag" instead. That way, users can see how far behind the wall-clock-time the source progress is all in all. It is also a more well defined metric as it does not oscillate with out-of-order events. What do you think? Best, Stephan On Fri, Sep 18, 2020 at 12:02 PM Becket Qin <becket....@gmail.com> wrote: > Hi folks, > > Thanks for all the great feedback. I have just updated FLIP-33 wiki with > the following changes: > > 1. Renaming. "currentFetchLatency" to "currentFetchEventTimeLag", > "currentLatency" to "currentEmitEventTimeLag". > 2. Added the public interface code change required for the new metrics. > 3. Added description of whether a metric is predefined or optional, and > which component is expected to update the metric. > > Please let me know if you have any questions. I'll start a vote in two > days if there are no further concerns. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Sep 9, 2020 at 9:56 AM Becket Qin <becket....@gmail.com> wrote: > >> Hi Stephan, >> >> Thanks for the input. Just a few more clarifications / questions. >> >> *Num Bytes / Records Metrics* >> >> 1. At this point, the *numRecordsIn(Rate)* metrics exist in both >> OperatorIOMetricGroup and TaskIOMetricGroup. I did not find >> *numRecordsIn(Rate)* in the TaskIOMetricGroup updated anywhere other >> than in the unit tests. Am I missing something? >> >> 2. *numBytesIn(Rate)* metrics only exist in TaskIOMetricGroup. At this >> point, the SourceReaders only has access to a SourceReaderContext which >> provides an OperatorMetricGroup. So it seems that the connector developers >> are not able to update the *numBytesIn(Rate). *With the multiple Source >> chaining support, it is possible that there are multiple Sources are in the >> same task. So it looks that we need to add *numBytesIn(Rate)* to the >> operator metrics as well. >> >> >> *Current (Fetch) Latency* >> >> *currentFetchLatency* helps clearly tell whether the latency is caused >> by Flink or not. Backpressure is not the only reason that we see fetch >> latency. Even if there is no back pressure, the records may have passed a >> long pipeline before they entered Flink. For example, say the *currentLatency >> *is 10 seconds and there is no backpressure. Does that mean the record >> spent 10 seconds in the Source operator? If not, how much did Flink >> contribute to that 10 seconds of latency? These questions are frequently >> asked and hard to tell without the fetch latency. >> >> For "currentFetchLatency", we would need to understand timestamps before >>> the records are decoded. That is only possible for some sources, where the >>> client gives us the records in a (partially) decoded from already (like >>> Kafka). Then, some work has been done between the fetch time and the time >>> we update the metric already, so it is already a bit closer to the >>> "currentFetchLatency". I think following this train of thought, there is >>> diminished benefit from that specific metric. >> >> >> We may not have to report the fetch latency before records are decoded. >> One solution is to remember the* FetchTime* when the encoded records are >> fetched, and report the fetch latency after the records are decoded by >> computing (*FetchTime - EventTime*). An approximate implementation would >> be adding a *FetchTime *field to the *RecordsWithSplitIds* assuming that >> all the records in that data structure are fetched at the same time. >> >> Thoughts? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Wed, Sep 9, 2020 at 12:42 AM Stephan Ewen <step...@ververica.com> >> wrote: >> >>> Thanks for reviving this, Becket! >>> >>> I think Konstantin's comments are great. I'd add these points: >>> >>> *Num Bytes / Records Metrics* >>> >>> For "numBytesIn" and "numRecordsIn", we should reuse the >>> OperatorIOMetric group, then it also gets reported to the overview page in >>> the Web UI. >>> >>> The "numBytesInPerSecond" and "numRecordsInPerSecond" are automatically >>> derived metrics, no need to do anything once we populate the above two >>> metrics >>> >>> >>> *Current (Fetch) Latency* >>> >>> I would really go for "eventTimeLag" rather than "fetchLatency". I think >>> "eventTimeLag" is a term that has some adoption in the Flink community and >>> beyond. >>> >>> I am not so sure that I see the benefit between "currentLatency" and >>> "currentFetchLatency", (or event time lag before/after) as this only is >>> different by the time it takes to emit a batch. >>> - In a non-backpressured case, these should be virtually identical >>> (and both dominated by watermark lag, not the actual time it takes the >>> fetch to be emitted) >>> - In a backpressured case, why do you care about when data was >>> fetched, as opposed to emitted? Emitted time is relevant for application >>> semantics and checkpoints. Fetch time seems to be an implementation detail >>> (how much does the source buffer). >>> >>> The "currentLatency" (eventTimeLagAfter) can be computed out-of-the-box, >>> independent of a source implementation, so that is also a good argument to >>> make it the main metric. >>> We know timestamps and watermarks in the source. Except for cases where >>> no watermarks have been defined at all (batch jobs or pure processing time >>> jobs), in which case this metric should probably be "Infinite". >>> >>> For "currentFetchLatency", we would need to understand timestamps before >>> the records are decoded. That is only possible for some sources, where the >>> client gives us the records in a (partially) decoded from already (like >>> Kafka). Then, some work has been done between the fetch time and the time >>> we update the metric already, so it is already a bit closer to the >>> "currentFetchLatency". I think following this train of thought, there is >>> diminished benefit from that specific metric. >>> >>> >>> *Idle Time* >>> >>> I agree, it would be great to rename this. Maybe to "sourceWaitTime" or >>> "sourceIdleTime" so to make clear that this is not exactly the time that >>> Flink's processing pipeline is idle, but the time where the source does not >>> have new data. >>> >>> This is not an easy metric to collect, though (except maybe for the >>> sources that are only idle while they have no split assigned, like >>> continuous file source). >>> >>> *Source Specific Metrics* >>> >>> I believe source-specific would only be "sourceIdleTime", >>> "numRecordsInErrors", "pendingBytes", and "pendingRecords". >>> >>> >>> *Conclusion* >>> >>> We can probably add "numBytesIn" and "numRecordsIn" and "eventTimeLag" >>> right away, with little complexity. >>> I'd suggest to start with these right away. >>> >>> Best, >>> Stephan >>> >>> >>> On Tue, Sep 8, 2020 at 3:25 PM Becket Qin <becket....@gmail.com> wrote: >>> >>>> Hey Konstantin, >>>> >>>> Thanks for the feedback and suggestions. Please see the reply below. >>>> >>>> * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has >>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>>>> They >>>>> have a similar name, but different definitions of idleness, >>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>>>> backpressured. Can we make it clearer that these two metrics mean >>>>> different >>>>> things? >>>> >>>> >>>> That is a good point. I did not notice this metric earlier. It seems >>>> that both metrics are useful to the users. One tells them how busy the >>>> source is and how much more throughput the source can handle. The other >>>> tells the users how long since the source has seen the last record, which >>>> is useful for debugging. I'll update the FLIP to make it clear. >>>> >>>> * "current(Fetch)Latency" I am wondering if >>>>> "eventTimeLag(Before|After)" >>>>> is more descriptive/clear. What do others think? >>>> >>>> >>>> I am quite open to the ideas on these names. In fact I also feel >>>> "current(Fetch)Latency" are not super intuitive. So it would be great if we >>>> can have better names. >>>> >>>> * Current(Fetch)Latency implies that the timestamps are directly >>>>> extracted in the source connector, right? Will this be the default for >>>>> FLIP-27 sources anyway? >>>> >>>> >>>> The "currentFetchLatency" will probably be reported by each source >>>> implementation, because the data fetching is done by SplitReaders and there >>>> is no base implementation. The "currentLatency", on the other hand, can be >>>> reported by the SourceReader base implementation. However, since developers >>>> can actually implement their own source connector without using our base >>>> implementation, these metric guidance are still useful for the connector >>>> developers in that case. >>>> >>>> * Does FLIP-33 also include the implementation of these metrics (to the >>>>> extent possible) for all connectors currently available in Apache >>>>> Flink or >>>>> is the "per-connector implementation" out-of-scope? >>>> >>>> >>>> FLIP-33 itself does not specify any implementation of those metrics. >>>> But the connectors we provide in Apache Flink will follow the guidance of >>>> FLIP-33 to emit those metrics when applicable. Maybe We can have some >>>> public static Strings defined for the metric names to help other connector >>>> developers follow the same guidance. I can also add that to the public >>>> interface section of the FLIP if we decide to do that. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Tue, Sep 8, 2020 at 9:02 PM Becket Qin <becket....@gmail.com> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf <kna...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Becket, >>>>>> >>>>>> Thank you for picking up this FLIP. I have a few questions: >>>>>> >>>>>> * two thoughts on naming: >>>>>> * idleTime: In the meantime, a similar metric >>>>>> "idleTimeMsPerSecond" has >>>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>>>>> They >>>>>> have a similar name, but different definitions of idleness, >>>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>>>>> backpressured. Can we make it clearer that these two metrics mean >>>>>> different >>>>>> things? >>>>>> >>>>>> * "current(Fetch)Latency" I am wondering if >>>>>> "eventTimeLag(Before|After)" >>>>>> is more descriptive/clear. What do others think? >>>>>> >>>>>> * Current(Fetch)Latency implies that the timestamps are directly >>>>>> extracted in the source connector, right? Will this be the default for >>>>>> FLIP-27 sources anyway? >>>>>> >>>>>> * Does FLIP-33 also include the implementation of these metrics (to >>>>>> the >>>>>> extent possible) for all connectors currently available in Apache >>>>>> Flink or >>>>>> is the "per-connector implementation" out-of-scope? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>> >>>>>> > Hi all, >>>>>> > >>>>>> > To complete the Source refactoring work, I'd like to revive this >>>>>> > discussion. Since the mail thread has been dormant for more than a >>>>>> year, >>>>>> > just to recap the motivation of the FLIP: >>>>>> > >>>>>> > 1. The FLIP proposes to standardize the connector metrics by giving >>>>>> > guidance on the metric specifications, including the name, type and >>>>>> meaning >>>>>> > of the metrics. >>>>>> > 2. It is OK for a connector to not emit some of the metrics in the >>>>>> metric >>>>>> > guidance, but if a metric of the same semantic is emitted, the >>>>>> > implementation should follow the guidance. >>>>>> > 3. It is OK for a connector to emit more metrics than what are >>>>>> listed in >>>>>> > the FLIP. This includes having an alias for a metric specified in >>>>>> the FLIP. >>>>>> > 4. We will implement some of the metrics out of the box in the >>>>>> default >>>>>> > implementation of FLIP-27, as long as it is applicable. >>>>>> > >>>>>> > The FLIP wiki is following: >>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33 >>>>>> > %3A+Standardize+Connector+Metrics >>>>>> > >>>>>> > Any thoughts? >>>>>> > >>>>>> > Thanks, >>>>>> > >>>>>> > Jiangjie (Becket) Qin >>>>>> > >>>>>> > >>>>>> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski <pi...@ververica.com >>>>>> > >>>>>> > wrote: >>>>>> > >>>>>> > > > we will need to revisit the convention list and adjust them >>>>>> accordingly >>>>>> > > when FLIP-27 is ready >>>>>> > > >>>>>> > > >>>>>> > > Yes, this sounds good :) >>>>>> > > >>>>>> > > Piotrek >>>>>> > > >>>>>> > > > On 13 Jun 2019, at 13:35, Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>> > > > >>>>>> > > > Hi Piotr, >>>>>> > > > >>>>>> > > > That's great to know. Chances are that we will need to revisit >>>>>> the >>>>>> > > > convention list and adjust them accordingly when FLIP-27 is >>>>>> ready, At >>>>>> > > that >>>>>> > > > point we can mark some of the metrics as available by default >>>>>> for >>>>>> > > > connectors implementing the new interface. >>>>>> > > > >>>>>> > > > Thanks, >>>>>> > > > >>>>>> > > > Jiangjie (Becket) Qin >>>>>> > > > >>>>>> > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski < >>>>>> pi...@ververica.com> >>>>>> > > wrote: >>>>>> > > > >>>>>> > > >> Thanks for driving this. I’ve just noticed one small thing. >>>>>> With new >>>>>> > > >> SourceReader interface Flink will be able to provide >>>>>> `idleTime` metric >>>>>> > > >> automatically. >>>>>> > > >> >>>>>> > > >> Piotrek >>>>>> > > >> >>>>>> > > >>> On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>> > > >>> >>>>>> > > >>> Thanks all for the feedback and discussion. >>>>>> > > >>> >>>>>> > > >>> Since there wasn't any concern raised, I've started the >>>>>> voting thread >>>>>> > > for >>>>>> > > >>> this FLIP, but please feel free to continue the discussion >>>>>> here if >>>>>> > you >>>>>> > > >>> think something still needs to be addressed. >>>>>> > > >>> >>>>>> > > >>> Thanks, >>>>>> > > >>> >>>>>> > > >>> Jiangjie (Becket) Qin >>>>>> > > >>> >>>>>> > > >>> >>>>>> > > >>> >>>>>> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin < >>>>>> becket....@gmail.com> >>>>>> > > wrote: >>>>>> > > >>> >>>>>> > > >>>> Hi Piotr, >>>>>> > > >>>> >>>>>> > > >>>> Thanks for the comments. Yes, you are right. Users will have >>>>>> to look >>>>>> > > at >>>>>> > > >>>> other metrics to decide whether the pipeline is healthy or >>>>>> not in >>>>>> > the >>>>>> > > >> first >>>>>> > > >>>> place before they can use the time-based metric to fix the >>>>>> > bottleneck. >>>>>> > > >>>> >>>>>> > > >>>> I agree that once we have FLIP-27 ready, some of the metrics >>>>>> can >>>>>> > just >>>>>> > > be >>>>>> > > >>>> reported by the abstract implementation. >>>>>> > > >>>> >>>>>> > > >>>> I've updated FLIP-33 wiki page to add the pendingBytes and >>>>>> > > >> pendingRecords >>>>>> > > >>>> metric. Please let me know if you have any concern over the >>>>>> updated >>>>>> > > >> metric >>>>>> > > >>>> convention proposal. >>>>>> > > >>>> >>>>>> > > >>>> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen >>>>>> > > >>>> <step...@ververica.com> will you also have time to take a >>>>>> look at >>>>>> > the >>>>>> > > >>>> proposed metric convention? If there is no further concern >>>>>> I'll >>>>>> > start >>>>>> > > a >>>>>> > > >>>> voting thread for this FLIP in two days. >>>>>> > > >>>> >>>>>> > > >>>> Thanks, >>>>>> > > >>>> >>>>>> > > >>>> Jiangjie (Becket) Qin >>>>>> > > >>>> >>>>>> > > >>>> >>>>>> > > >>>> >>>>>> > > >>>> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski < >>>>>> pi...@ververica.com> >>>>>> > > >> wrote: >>>>>> > > >>>> >>>>>> > > >>>>> Hi Becket, >>>>>> > > >>>>> >>>>>> > > >>>>> Thanks for the answer :) >>>>>> > > >>>>> >>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>>>> > producing >>>>>> > > >> the >>>>>> > > >>>>>> record to downstream. For example, a source connector can >>>>>> report >>>>>> > > that >>>>>> > > >>>>> it's >>>>>> > > >>>>>> spending 80% of time to emit record to downstream >>>>>> processing >>>>>> > > pipeline. >>>>>> > > >>>>> In >>>>>> > > >>>>>> another case, a sink connector may report that its >>>>>> spending 30% of >>>>>> > > >> time >>>>>> > > >>>>>> producing the records to the external system. >>>>>> > > >>>>>> >>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage >>>>>> metric: >>>>>> > > >>>>> >>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>>>> > > downstream >>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>>>> > > downstream >>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>>>> full. >>>>>> > > >>>>> >>>>>> > > >>>>> If by “time spent on emitting records to downstream” you >>>>>> understand >>>>>> > > >>>>> “waiting on back pressure”, then I see your point. And I >>>>>> agree that >>>>>> > > >> some >>>>>> > > >>>>> kind of ratio/time based metric gives you more information. >>>>>> However >>>>>> > > >> under >>>>>> > > >>>>> “time spent on emitting records to downstream” might be >>>>>> hidden the >>>>>> > > >>>>> following (extreme) situation: >>>>>> > > >>>>> >>>>>> > > >>>>> 1. Job is barely able to handle influx of records, there is >>>>>> 99% >>>>>> > > >>>>> CPU/resource usage in the cluster, but nobody is >>>>>> > > >>>>> bottlenecked/backpressured, all output buffers are empty, >>>>>> everybody >>>>>> > > is >>>>>> > > >>>>> waiting in 1% of it’s time for more records to process. >>>>>> > > >>>>> 2. 80% time can still be spent on "down stream operators”, >>>>>> because >>>>>> > > they >>>>>> > > >>>>> are the CPU intensive operations, but this doesn’t mean that >>>>>> > > >> increasing the >>>>>> > > >>>>> parallelism down the stream will help with anything there. >>>>>> To the >>>>>> > > >> contrary, >>>>>> > > >>>>> increasing parallelism of the source operator might help to >>>>>> > increase >>>>>> > > >>>>> resource utilisation up to 100%. >>>>>> > > >>>>> >>>>>> > > >>>>> However, this “time based/ratio” approach can be extended to >>>>>> > > in/output >>>>>> > > >>>>> buffer usage. Besides collecting an information that >>>>>> input/output >>>>>> > > >> buffer is >>>>>> > > >>>>> full/empty, we can probe profile how often are buffer >>>>>> empty/full. >>>>>> > If >>>>>> > > >> output >>>>>> > > >>>>> buffer is full 1% of times, there is almost no back >>>>>> pressure. If >>>>>> > it’s >>>>>> > > >> full >>>>>> > > >>>>> 80% of times, there is some back pressure, if it’s full >>>>>> 99.9% of >>>>>> > > times, >>>>>> > > >>>>> there is huge back pressure. >>>>>> > > >>>>> >>>>>> > > >>>>> Now for autoscaling you could compare the input & output >>>>>> buffers >>>>>> > fill >>>>>> > > >>>>> ratio: >>>>>> > > >>>>> >>>>>> > > >>>>> 1. Both are high, the source of bottleneck is down the >>>>>> stream >>>>>> > > >>>>> 2. Output is low, input is high, this is the bottleneck and >>>>>> the >>>>>> > > higher >>>>>> > > >>>>> the difference, the bigger source of bottleneck is this is >>>>>> > > >> operator/task >>>>>> > > >>>>> 3. Output is high, input is low - there was some load spike >>>>>> that we >>>>>> > > are >>>>>> > > >>>>> currently finishing to process >>>>>> > > >>>>> >>>>>> > > >>>>> >>>>>> > > >>>>> >>>>>> > > >>>>> But long story short, we are probably diverging from the >>>>>> topic of >>>>>> > > this >>>>>> > > >>>>> discussion, and we can discuss this at some later point. >>>>>> > > >>>>> >>>>>> > > >>>>> For now, for sources: >>>>>> > > >>>>> >>>>>> > > >>>>> as I wrote before, +1 for: >>>>>> > > >>>>> - pending.bytes, Gauge >>>>>> > > >>>>> - pending.messages, Gauge >>>>>> > > >>>>> >>>>>> > > >>>>> When we will be developing/discussing SourceReader from >>>>>> FLIP-27 we >>>>>> > > >> might >>>>>> > > >>>>> then add: >>>>>> > > >>>>> >>>>>> > > >>>>> - in-memory.buffer.usage (0 - 100%) >>>>>> > > >>>>> >>>>>> > > >>>>> Which will be estimated automatically by Flink while user >>>>>> will be >>>>>> > > able >>>>>> > > >> to >>>>>> > > >>>>> override/provide better estimation. >>>>>> > > >>>>> >>>>>> > > >>>>> Piotrek >>>>>> > > >>>>> >>>>>> > > >>>>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>> > > >>>>>> >>>>>> > > >>>>>> Hi Piotr, >>>>>> > > >>>>>> >>>>>> > > >>>>>> Thanks for the explanation. Please see some clarifications >>>>>> below. >>>>>> > > >>>>>> >>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>>>> > producing >>>>>> > > >> the >>>>>> > > >>>>>> record to downstream. For example, a source connector can >>>>>> report >>>>>> > > that >>>>>> > > >>>>> it's >>>>>> > > >>>>>> spending 80% of time to emit record to downstream >>>>>> processing >>>>>> > > pipeline. >>>>>> > > >>>>> In >>>>>> > > >>>>>> another case, a sink connector may report that its >>>>>> spending 30% of >>>>>> > > >> time >>>>>> > > >>>>>> producing the records to the external system. >>>>>> > > >>>>>> >>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage >>>>>> metric: >>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>>>> > > downstream >>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>>>> > > downstream >>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>>>> full. >>>>>> > > >>>>>> >>>>>> > > >>>>>> However, the time-based metric has a few advantages that >>>>>> the >>>>>> > buffer >>>>>> > > >>>>> usage >>>>>> > > >>>>>> metric may not have. >>>>>> > > >>>>>> >>>>>> > > >>>>>> 1. Buffer usage metric may not be applicable to all the >>>>>> connector >>>>>> > > >>>>>> implementations, while reporting time-based metric are >>>>>> always >>>>>> > > doable. >>>>>> > > >>>>>> Some source connectors may not have any input buffer, or >>>>>> they may >>>>>> > > use >>>>>> > > >>>>> some >>>>>> > > >>>>>> third party library that does not expose the input buffer >>>>>> at all. >>>>>> > > >>>>>> Similarly, for sink connectors, the implementation may not >>>>>> have >>>>>> > any >>>>>> > > >>>>> output >>>>>> > > >>>>>> buffer, or the third party library does not expose such >>>>>> buffer. >>>>>> > > >>>>>> >>>>>> > > >>>>>> 2. Although both type of metrics can detect bottleneck, >>>>>> time-based >>>>>> > > >>>>> metrics >>>>>> > > >>>>>> can be used to generate a more informed action to remove >>>>>> the >>>>>> > > >> bottleneck. >>>>>> > > >>>>>> For example, when the downstream is bottleneck, the output >>>>>> buffer >>>>>> > > >> usage >>>>>> > > >>>>>> metric is likely to be 100%, and the input buffer usage >>>>>> might be >>>>>> > 0%. >>>>>> > > >>>>> That >>>>>> > > >>>>>> means we don't know what is the suitable parallelism to >>>>>> lift the >>>>>> > > >>>>>> bottleneck. The time-based metric, on the other hand, >>>>>> would give >>>>>> > > >> useful >>>>>> > > >>>>>> information, e.g. if 80% of time was spent on emitting >>>>>> records, we >>>>>> > > can >>>>>> > > >>>>>> roughly increase the downstream node parallelism by 4 >>>>>> times. >>>>>> > > >>>>>> >>>>>> > > >>>>>> Admittedly, the time-based metrics are more expensive than >>>>>> buffer >>>>>> > > >>>>> usage. So >>>>>> > > >>>>>> we may have to do some sampling to reduce the cost. But in >>>>>> > general, >>>>>> > > >>>>> using >>>>>> > > >>>>>> time-based metrics seems worth adding. >>>>>> > > >>>>>> >>>>>> > > >>>>>> That being said, I don't think buffer usage metric and >>>>>> time-based >>>>>> > > >>>>> metrics >>>>>> > > >>>>>> are mutually exclusive. We can probably have both. It is >>>>>> just that >>>>>> > > in >>>>>> > > >>>>>> practice, features like auto-scaling might prefer >>>>>> time-based >>>>>> > metrics >>>>>> > > >> for >>>>>> > > >>>>>> the reason stated above. >>>>>> > > >>>>>> >>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually >>>>>> detect >>>>>> > > >>>>> bottlenecks. >>>>>> > > >>>>>> As I wrote, we already have them in most of the places, >>>>>> except of >>>>>> > > >>>>>> sources/sinks. >>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. >>>>>> > > Currently >>>>>> > > >> we >>>>>> > > >>>>>> are only automatically detecting back pressure and >>>>>> reporting it to >>>>>> > > the >>>>>> > > >>>>> user >>>>>> > > >>>>>> in web UI (is it exposed as a metric at all?). Detecting >>>>>> the root >>>>>> > > >> cause >>>>>> > > >>>>> of >>>>>> > > >>>>>> the back pressure (bottleneck) is one step further. >>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is >>>>>> > located, >>>>>> > > >> to >>>>>> > > >>>>>> try to do something with it. >>>>>> > > >>>>>> >>>>>> > > >>>>>> As explained above, I think time-based metric also >>>>>> addresses item >>>>>> > 1 >>>>>> > > >> and >>>>>> > > >>>>>> item 2. >>>>>> > > >>>>>> >>>>>> > > >>>>>> Any thoughts? >>>>>> > > >>>>>> >>>>>> > > >>>>>> Thanks, >>>>>> > > >>>>>> >>>>>> > > >>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>> >>>>>> > > >>>>>> >>>>>> > > >>>>>> >>>>>> > > >>>>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski < >>>>>> > pi...@ververica.com> >>>>>> > > >>>>> wrote: >>>>>> > > >>>>>> >>>>>> > > >>>>>>> Hi again :) >>>>>> > > >>>>>>> >>>>>> > > >>>>>>>> - pending.bytes, Gauge >>>>>> > > >>>>>>>> - pending.messages, Gauge >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> +1 >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> And true, instead of overloading one of the metric it is >>>>>> better >>>>>> > > when >>>>>> > > >>>>> user >>>>>> > > >>>>>>> can choose to provide only one of them. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> Re 2: >>>>>> > > >>>>>>> >>>>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>>>> pending >>>>>> > > >> mesages >>>>>> > > >>>>> / >>>>>> > > >>>>>>>> bytes would answer the questions of: >>>>>> > > >>>>>>> >>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind >>>>>> + empty >>>>>> > > >>>>> buffer >>>>>> > > >>>>>>> = >>>>>> > > >>>>>>>> cannot consume fast enough. >>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>>>> full >>>>>> > > buffer >>>>>> > > >> = >>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> Yes, exactly. This can also be used to support decisions >>>>>> like >>>>>> > > >> changing >>>>>> > > >>>>> the >>>>>> > > >>>>>>> parallelism of the sources and/or down stream operators. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> I’m not sure if I understand your proposal with time based >>>>>> > > >>>>> measurements. >>>>>> > > >>>>>>> Maybe I’m missing something, but I do not see how >>>>>> measuring time >>>>>> > > >> alone >>>>>> > > >>>>>>> could answer the problem: where is the bottleneck. Time >>>>>> spent on >>>>>> > > the >>>>>> > > >>>>>>> next/emit might be short or long (depending on how heavy >>>>>> to >>>>>> > process >>>>>> > > >> the >>>>>> > > >>>>>>> record is) and the source can still be bottlenecked/back >>>>>> > pressured >>>>>> > > or >>>>>> > > >>>>> not. >>>>>> > > >>>>>>> Usually the easiest and the most reliable way how to >>>>>> detect >>>>>> > > >>>>> bottlenecks is >>>>>> > > >>>>>>> by checking usage of input & output buffers, since when >>>>>> input >>>>>> > > buffer >>>>>> > > >> is >>>>>> > > >>>>>>> full while output buffer is empty, that’s the definition >>>>>> of a >>>>>> > > >>>>> bottleneck. >>>>>> > > >>>>>>> Also this is usually very easy and cheap to measure (it >>>>>> works >>>>>> > > >>>>> effectively >>>>>> > > >>>>>>> the same way as current’s Flink back pressure monitoring, >>>>>> but >>>>>> > more >>>>>> > > >>>>> cleanly, >>>>>> > > >>>>>>> without probing thread’s stack traces). >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> Also keep in mind that we are already using the buffer >>>>>> usage >>>>>> > > metrics >>>>>> > > >>>>> for >>>>>> > > >>>>>>> detecting the bottlenecks in Flink’s internal network >>>>>> exchanges >>>>>> > > >> (manual >>>>>> > > >>>>>>> work). That’s the reason why I wanted to extend this to >>>>>> > > >> sources/sinks, >>>>>> > > >>>>>>> since they are currently our blind spot. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>>>> > > automatically >>>>>> > > >>>>>>> relies >>>>>> > > >>>>>>>> on some metrics answering the same question >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> That would be very helpful feature. I think in order to >>>>>> achieve >>>>>> > > that >>>>>> > > >> we >>>>>> > > >>>>>>> would need to: >>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually >>>>>> detect >>>>>> > > >>>>> bottlenecks. >>>>>> > > >>>>>>> As I wrote, we already have them in most of the places, >>>>>> except of >>>>>> > > >>>>>>> sources/sinks. >>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. >>>>>> > > Currently >>>>>> > > >> we >>>>>> > > >>>>>>> are only automatically detecting back pressure and >>>>>> reporting it >>>>>> > to >>>>>> > > >> the >>>>>> > > >>>>> user >>>>>> > > >>>>>>> in web UI (is it exposed as a metric at all?). Detecting >>>>>> the root >>>>>> > > >>>>> cause of >>>>>> > > >>>>>>> the back pressure (bottleneck) is one step further. >>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is >>>>>> > located, >>>>>> > > >> to >>>>>> > > >>>>> try >>>>>> > > >>>>>>> to do something with it. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> I think you are aiming for point 3., but before we reach >>>>>> it, we >>>>>> > are >>>>>> > > >>>>> still >>>>>> > > >>>>>>> missing 1. & 2. Also even if we have 3., there is a value >>>>>> in 1 & >>>>>> > 2 >>>>>> > > >> for >>>>>> > > >>>>>>> manual analysis/dashboards. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> However, having the knowledge of where the bottleneck is, >>>>>> doesn’t >>>>>> > > >>>>>>> necessarily mean that we know what we can do about it. For >>>>>> > example >>>>>> > > >>>>>>> increasing parallelism might or might not help with >>>>>> anything >>>>>> > (data >>>>>> > > >>>>> skew, >>>>>> > > >>>>>>> bottleneck on some resource that does not scale), but >>>>>> this remark >>>>>> > > >>>>> applies >>>>>> > > >>>>>>> always, regardless of the way how did we detect the >>>>>> bottleneck. >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> Piotrek >>>>>> > > >>>>>>> >>>>>> > > >>>>>>>> On 3 Jun 2019, at 06:16, Becket Qin < >>>>>> becket....@gmail.com> >>>>>> > wrote: >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Hi Piotr, >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Thanks for the suggestion. Some thoughts below: >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Re 1: The pending messages / bytes. >>>>>> > > >>>>>>>> I completely agree these are very useful metrics and we >>>>>> should >>>>>> > > >> expect >>>>>> > > >>>>> the >>>>>> > > >>>>>>>> connector to report. WRT the way to expose them, it >>>>>> seems more >>>>>> > > >>>>> consistent >>>>>> > > >>>>>>>> to add two metrics instead of adding a method (unless >>>>>> there are >>>>>> > > >> other >>>>>> > > >>>>> use >>>>>> > > >>>>>>>> cases other than metric reporting). So we can add the >>>>>> following >>>>>> > > two >>>>>> > > >>>>>>> metrics. >>>>>> > > >>>>>>>> - pending.bytes, Gauge >>>>>> > > >>>>>>>> - pending.messages, Gauge >>>>>> > > >>>>>>>> Applicable connectors can choose to report them. These >>>>>> two >>>>>> > metrics >>>>>> > > >>>>> along >>>>>> > > >>>>>>>> with latency should be sufficient for users to >>>>>> understand the >>>>>> > > >> progress >>>>>> > > >>>>>>> of a >>>>>> > > >>>>>>>> connector. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Re 2: Number of buffered data in-memory of the connector >>>>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>>>> pending >>>>>> > > >> mesages >>>>>> > > >>>>> / >>>>>> > > >>>>>>>> bytes would answer the questions of: >>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind >>>>>> + empty >>>>>> > > >>>>> buffer >>>>>> > > >>>>>>> = >>>>>> > > >>>>>>>> cannot consume fast enough. >>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>>>> full >>>>>> > > buffer >>>>>> > > >> = >>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>>>> > > automatically >>>>>> > > >>>>>>> relies >>>>>> > > >>>>>>>> on some metrics answering the same question, more >>>>>> specifically, >>>>>> > we >>>>>> > > >> are >>>>>> > > >>>>>>>> profiling the time spent on .next() method (time to >>>>>> consume) and >>>>>> > > the >>>>>> > > >>>>> time >>>>>> > > >>>>>>>> spent on .collect() method (time to emit / process). One >>>>>> > advantage >>>>>> > > >> of >>>>>> > > >>>>>>> such >>>>>> > > >>>>>>>> method level time cost allows us to calculate the >>>>>> parallelism >>>>>> > > >>>>> required to >>>>>> > > >>>>>>>> keep up in case their is a lag. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> However, one concern I have regarding such metric is >>>>>> that they >>>>>> > are >>>>>> > > >>>>>>>> implementation specific. Either profiling on the method >>>>>> time, or >>>>>> > > >>>>>>> reporting >>>>>> > > >>>>>>>> buffer usage assumes the connector are implemented in >>>>>> such a >>>>>> > way. >>>>>> > > A >>>>>> > > >>>>>>>> slightly better solution might be have the following >>>>>> metric: >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> - EmitTimeRatio (or FetchTimeRatio): The time spent on >>>>>> emitting >>>>>> > > >>>>>>>> records / Total time elapsed. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> This assumes that the source connectors have to emit the >>>>>> records >>>>>> > > to >>>>>> > > >>>>> the >>>>>> > > >>>>>>>> downstream at some point. The emission may take some >>>>>> time ( e.g. >>>>>> > > go >>>>>> > > >>>>>>> through >>>>>> > > >>>>>>>> chained operators). And the rest of the time are spent to >>>>>> > prepare >>>>>> > > >> the >>>>>> > > >>>>>>>> record to emit, including time for consuming and format >>>>>> > > conversion, >>>>>> > > >>>>> etc. >>>>>> > > >>>>>>>> Ideally, we'd like to see the time spent on record fetch >>>>>> and >>>>>> > emit >>>>>> > > to >>>>>> > > >>>>> be >>>>>> > > >>>>>>>> about the same, so no one is bottleneck for the other. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> The downside of these time based metrics is additional >>>>>> overhead >>>>>> > to >>>>>> > > >> get >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>> time, therefore sampling might be needed. But in >>>>>> practice I feel >>>>>> > > >> such >>>>>> > > >>>>>>> time >>>>>> > > >>>>>>>> based metric might be more useful if we want to take >>>>>> action. >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> I think we should absolutely add metrics in (1) to the >>>>>> metric >>>>>> > > >>>>> convention. >>>>>> > > >>>>>>>> We could also add the metrics mentioned in (2) if we >>>>>> reach >>>>>> > > consensus >>>>>> > > >>>>> on >>>>>> > > >>>>>>>> that. What do you think? >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Thanks, >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski < >>>>>> > > pi...@ververica.com >>>>>> > > >>> >>>>>> > > >>>>>>> wrote: >>>>>> > > >>>>>>>> >>>>>> > > >>>>>>>>> Hey Becket, >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Re 1a) and 1b) +1 from my side. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> I’ve discussed this issue: >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us >>>>>> to check >>>>>> > > the >>>>>> > > >>>>> cause >>>>>> > > >>>>>>>>> of >>>>>> > > >>>>>>>>>>>> back pressure: >>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>>>> boolean >>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>>>> boolean >>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> With Nico at some lengths and he also saw the benefits >>>>>> of them. >>>>>> > > We >>>>>> > > >>>>> also >>>>>> > > >>>>>>>>> have more concrete proposal for that. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Actually there are two really useful metrics, that we >>>>>> are >>>>>> > missing >>>>>> > > >>>>>>>>> currently: >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> 1. Number of data/records/bytes in the backlog to >>>>>> process. For >>>>>> > > >>>>> example >>>>>> > > >>>>>>>>> remaining number of bytes in unread files. Or pending >>>>>> data in >>>>>> > > Kafka >>>>>> > > >>>>>>> topics. >>>>>> > > >>>>>>>>> 2. Number of buffered data in-memory of the connector, >>>>>> that are >>>>>> > > >>>>> waiting >>>>>> > > >>>>>>> to >>>>>> > > >>>>>>>>> be processed pushed to Flink pipeline. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Re 1: >>>>>> > > >>>>>>>>> This would have to be a metric provided directly by a >>>>>> > connector. >>>>>> > > It >>>>>> > > >>>>>>> could >>>>>> > > >>>>>>>>> be an undefined `int`: >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> `int backlog` - estimate of pending work. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> “Undefined” meaning that it would be up to a connector >>>>>> to >>>>>> > decided >>>>>> > > >>>>>>> whether >>>>>> > > >>>>>>>>> it’s measured in bytes, records, pending files or >>>>>> whatever it >>>>>> > is >>>>>> > > >>>>>>> possible >>>>>> > > >>>>>>>>> to provide by the connector. This is because I assume >>>>>> not every >>>>>> > > >>>>>>> connector >>>>>> > > >>>>>>>>> can provide exact number and for some of them it might >>>>>> be >>>>>> > > >> impossible >>>>>> > > >>>>> to >>>>>> > > >>>>>>>>> provide records number of bytes count. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Re 2: >>>>>> > > >>>>>>>>> This metric could be either provided by a connector, or >>>>>> > > calculated >>>>>> > > >>>>>>> crudely >>>>>> > > >>>>>>>>> by Flink: >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> `float bufferUsage` - value from [0.0, 1.0] range >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Percentage of used in memory buffers, like in Kafka’s >>>>>> handover. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> It could be crudely implemented by Flink with FLIP-27 >>>>>> > > >>>>>>>>> SourceReader#isAvailable. If SourceReader is not >>>>>> available >>>>>> > > reported >>>>>> > > >>>>>>>>> `bufferUsage` could be 0.0. If it is available, it >>>>>> could be >>>>>> > 1.0. >>>>>> > > I >>>>>> > > >>>>> think >>>>>> > > >>>>>>>>> this would be a good enough estimation for most of the >>>>>> use >>>>>> > cases >>>>>> > > >>>>> (that >>>>>> > > >>>>>>>>> could be overloaded and implemented better if desired). >>>>>> > > Especially >>>>>> > > >>>>>>> since we >>>>>> > > >>>>>>>>> are reporting only probed values: if probed values are >>>>>> almost >>>>>> > > >> always >>>>>> > > >>>>>>> “1.0”, >>>>>> > > >>>>>>>>> it would mean that we have a back pressure. If they are >>>>>> almost >>>>>> > > >> always >>>>>> > > >>>>>>>>> “0.0”, there is probably no back pressure at the >>>>>> sources. >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> What do you think about this? >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> Piotrek >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>>> On 30 May 2019, at 11:41, Becket Qin < >>>>>> becket....@gmail.com> >>>>>> > > >> wrote: >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> Hi all, >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> Thanks a lot for all the feedback and comments. I'd >>>>>> like to >>>>>> > > >> continue >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>> discussion on this FLIP. >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> I updated the FLIP-33 wiki to remove all the histogram >>>>>> metrics >>>>>> > > >> from >>>>>> > > >>>>> the >>>>>> > > >>>>>>>>>> first version of metric convention due to the >>>>>> performance >>>>>> > > concern. >>>>>> > > >>>>> The >>>>>> > > >>>>>>>>> plan >>>>>> > > >>>>>>>>>> is to introduce them later when we have a mechanism to >>>>>> opt >>>>>> > > in/out >>>>>> > > >>>>>>>>> metrics. >>>>>> > > >>>>>>>>>> At that point, users can decide whether they want to >>>>>> pay the >>>>>> > > cost >>>>>> > > >> to >>>>>> > > >>>>>>> get >>>>>> > > >>>>>>>>>> the metric or not. >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> As Stephan suggested, for this FLIP, let's first try >>>>>> to agree >>>>>> > on >>>>>> > > >> the >>>>>> > > >>>>>>>>> small >>>>>> > > >>>>>>>>>> list of conventional metrics that connectors should >>>>>> follow. >>>>>> > > >>>>>>>>>> Just to be clear, the purpose of the convention is not >>>>>> to >>>>>> > > enforce >>>>>> > > >>>>> every >>>>>> > > >>>>>>>>>> connector to report all these metrics, but to provide a >>>>>> > guidance >>>>>> > > >> in >>>>>> > > >>>>>>> case >>>>>> > > >>>>>>>>>> these metrics are reported by some connectors. >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> @ Stephan & Chesnay, >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> Regarding the duplication of `RecordsIn` metric in >>>>>> operator / >>>>>> > > task >>>>>> > > >>>>>>>>>> IOMetricGroups, from what I understand, for source >>>>>> operator, >>>>>> > it >>>>>> > > is >>>>>> > > >>>>>>>>> actually >>>>>> > > >>>>>>>>>> the SourceFunction that reports the operator level >>>>>> > > >>>>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are >>>>>> essentially >>>>>> > the >>>>>> > > >>>>> same >>>>>> > > >>>>>>>>>> metric in the operator level IOMetricGroup. Similarly >>>>>> for the >>>>>> > > Sink >>>>>> > > >>>>>>>>>> operator, the operator level >>>>>> RecordsOut/RecordsOutPerSecond >>>>>> > > >> metrics >>>>>> > > >>>>> are >>>>>> > > >>>>>>>>>> also reported by the Sink function. I marked them as >>>>>> existing >>>>>> > in >>>>>> > > >> the >>>>>> > > >>>>>>>>>> FLIP-33 wiki page. Please let me know if I >>>>>> misunderstood. >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin < >>>>>> > > becket....@gmail.com> >>>>>> > > >>>>>>> wrote: >>>>>> > > >>>>>>>>>> >>>>>> > > >>>>>>>>>>> Hi Piotr, >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> Thanks a lot for the feedback. >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> 1a) I guess you are referring to the part that >>>>>> "original >>>>>> > system >>>>>> > > >>>>>>> specific >>>>>> > > >>>>>>>>>>> metrics should also be reported". The performance >>>>>> impact >>>>>> > > depends >>>>>> > > >> on >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>> implementation. An efficient implementation would >>>>>> only record >>>>>> > > the >>>>>> > > >>>>>>> metric >>>>>> > > >>>>>>>>>>> once, but report them with two different metric >>>>>> names. This >>>>>> > is >>>>>> > > >>>>>>> unlikely >>>>>> > > >>>>>>>>> to >>>>>> > > >>>>>>>>>>> hurt performance. >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> 1b) Yes, I agree that we should avoid adding overhead >>>>>> to the >>>>>> > > >>>>> critical >>>>>> > > >>>>>>>>> path >>>>>> > > >>>>>>>>>>> by all means. This is sometimes a tradeoff, running >>>>>> blindly >>>>>> > > >> without >>>>>> > > >>>>>>> any >>>>>> > > >>>>>>>>>>> metric gives best performance, but sometimes might be >>>>>> > > frustrating >>>>>> > > >>>>> when >>>>>> > > >>>>>>>>> we >>>>>> > > >>>>>>>>>>> debug some issues. >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> 2. The metrics are indeed very useful. Are they >>>>>> supposed to >>>>>> > be >>>>>> > > >>>>>>> reported >>>>>> > > >>>>>>>>> by >>>>>> > > >>>>>>>>>>> the connectors or Flink itself? At this point FLIP-33 >>>>>> is more >>>>>> > > >>>>> focused >>>>>> > > >>>>>>> on >>>>>> > > >>>>>>>>>>> provide a guidance to the connector authors on the >>>>>> metrics >>>>>> > > >>>>> reporting. >>>>>> > > >>>>>>>>> That >>>>>> > > >>>>>>>>>>> said, after FLIP-27, I think we should absolutely >>>>>> report >>>>>> > these >>>>>> > > >>>>> metrics >>>>>> > > >>>>>>>>> in >>>>>> > > >>>>>>>>>>> the abstract implementation. In any case, the metric >>>>>> > convention >>>>>> > > >> in >>>>>> > > >>>>>>> this >>>>>> > > >>>>>>>>>>> list are expected to evolve over time. >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski < >>>>>> > > >>>>> pi...@ververica.com> >>>>>> > > >>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> Hi, >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> Thanks for the proposal and driving the effort here >>>>>> Becket >>>>>> > :) >>>>>> > > >> I’ve >>>>>> > > >>>>>>> read >>>>>> > > >>>>>>>>>>>> through the FLIP-33 [1], and here are couple of my >>>>>> thoughts. >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> Big +1 for standardising the metric names between >>>>>> > connectors, >>>>>> > > it >>>>>> > > >>>>> will >>>>>> > > >>>>>>>>>>>> definitely help us and users a lot. >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> Issues/questions/things to discuss that I’ve thought >>>>>> of: >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> 1a. If we are about to duplicate some metrics, can >>>>>> this >>>>>> > > become a >>>>>> > > >>>>>>>>>>>> performance issue? >>>>>> > > >>>>>>>>>>>> 1b. Generally speaking, we should make sure that >>>>>> collecting >>>>>> > > >> those >>>>>> > > >>>>>>>>> metrics >>>>>> > > >>>>>>>>>>>> is as non intrusive as possible, especially that >>>>>> they will >>>>>> > > need >>>>>> > > >>>>> to be >>>>>> > > >>>>>>>>>>>> updated once per record. (They might be collected >>>>>> more >>>>>> > rarely >>>>>> > > >> with >>>>>> > > >>>>>>> some >>>>>> > > >>>>>>>>>>>> overhead, but the hot path of updating it per record >>>>>> will >>>>>> > need >>>>>> > > >> to >>>>>> > > >>>>> be >>>>>> > > >>>>>>> as >>>>>> > > >>>>>>>>>>>> quick as possible). That includes both avoiding heavy >>>>>> > > >> computation >>>>>> > > >>>>> on >>>>>> > > >>>>>>>>> per >>>>>> > > >>>>>>>>>>>> record path: histograms?, measuring time for time >>>>>> based >>>>>> > > metrics >>>>>> > > >>>>> (per >>>>>> > > >>>>>>>>>>>> second) (System.currentTimeMillis() depending on the >>>>>> > > >>>>> implementation >>>>>> > > >>>>>>> can >>>>>> > > >>>>>>>>>>>> invoke a system call) >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us >>>>>> to check >>>>>> > > the >>>>>> > > >>>>> cause >>>>>> > > >>>>>>>>> of >>>>>> > > >>>>>>>>>>>> back pressure: >>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>>>> boolean >>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>>>> boolean >>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> a) is useful in a scenario when we are processing >>>>>> backlog of >>>>>> > > >>>>> records, >>>>>> > > >>>>>>>>> all >>>>>> > > >>>>>>>>>>>> of the internal Flink’s input/output network buffers >>>>>> are >>>>>> > > empty, >>>>>> > > >>>>> and >>>>>> > > >>>>>>> we >>>>>> > > >>>>>>>>> want >>>>>> > > >>>>>>>>>>>> to check whether the external source system is the >>>>>> > bottleneck >>>>>> > > >>>>>>> (source’s >>>>>> > > >>>>>>>>>>>> input queue will be empty), or if the Flink’s >>>>>> connector is >>>>>> > the >>>>>> > > >>>>>>>>> bottleneck >>>>>> > > >>>>>>>>>>>> (source’s input queues will be full). >>>>>> > > >>>>>>>>>>>> b) similar story. Backlog of records, but this time >>>>>> all of >>>>>> > the >>>>>> > > >>>>>>> internal >>>>>> > > >>>>>>>>>>>> Flink’s input/ouput network buffers are full, and we >>>>>> want o >>>>>> > > >> check >>>>>> > > >>>>>>>>> whether >>>>>> > > >>>>>>>>>>>> the external sink system is the bottleneck (sink >>>>>> output >>>>>> > queues >>>>>> > > >> are >>>>>> > > >>>>>>>>> full), >>>>>> > > >>>>>>>>>>>> or if the Flink’s connector is the bottleneck >>>>>> (sink’s output >>>>>> > > >>>>> queues >>>>>> > > >>>>>>>>> will be >>>>>> > > >>>>>>>>>>>> empty) >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> It might be sometimes difficult to provide those >>>>>> metrics, so >>>>>> > > >> they >>>>>> > > >>>>>>> could >>>>>> > > >>>>>>>>>>>> be optional, but if we could provide them, it would >>>>>> be >>>>>> > really >>>>>> > > >>>>>>> helpful. >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> Piotrek >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> [1] >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>> >>>>>> > > >> >>>>>> > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>> > > >>>>>>>>>>>> < >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>> >>>>>> > > >> >>>>>> > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen < >>>>>> se...@apache.org> >>>>>> > > >> wrote: >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> I think this sounds reasonable. >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> Let's keep the "reconfiguration without stopping >>>>>> the job" >>>>>> > out >>>>>> > > >> of >>>>>> > > >>>>>>> this, >>>>>> > > >>>>>>>>>>>>> because that would be a super big effort and if we >>>>>> approach >>>>>> > > >> that, >>>>>> > > >>>>>>> then >>>>>> > > >>>>>>>>>>>> in >>>>>> > > >>>>>>>>>>>>> more generic way rather than specific to connector >>>>>> metrics. >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> I would suggest to look at the following things >>>>>> before >>>>>> > > starting >>>>>> > > >>>>> with >>>>>> > > >>>>>>>>> any >>>>>> > > >>>>>>>>>>>>> implementation work: >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> - Try and find a committer to support this, >>>>>> otherwise it >>>>>> > will >>>>>> > > >> be >>>>>> > > >>>>>>> hard >>>>>> > > >>>>>>>>>>>> to >>>>>> > > >>>>>>>>>>>>> make progress >>>>>> > > >>>>>>>>>>>>> - Start with defining a smaller set of "core >>>>>> metrics" and >>>>>> > > >> extend >>>>>> > > >>>>> the >>>>>> > > >>>>>>>>>>>> set >>>>>> > > >>>>>>>>>>>>> later. I think that is easier than now blocking on >>>>>> reaching >>>>>> > > >>>>>>> consensus >>>>>> > > >>>>>>>>>>>> on a >>>>>> > > >>>>>>>>>>>>> large group of metrics. >>>>>> > > >>>>>>>>>>>>> - Find a solution to the problem Chesnay mentioned, >>>>>> that >>>>>> > the >>>>>> > > >>>>>>> "records >>>>>> > > >>>>>>>>>>>> in" >>>>>> > > >>>>>>>>>>>>> metric is somehow overloaded and exists already in >>>>>> the IO >>>>>> > > >> Metric >>>>>> > > >>>>>>>>> group. >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin < >>>>>> > > >> becket....@gmail.com >>>>>> > > >>>>>> >>>>>> > > >>>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> Hi Stephan, >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> Thanks a lot for the feedback. All makes sense. >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> It is a good suggestion to simply have an >>>>>> > onRecord(numBytes, >>>>>> > > >>>>>>>>> eventTime) >>>>>> > > >>>>>>>>>>>>>> method for connector writers. It should meet most >>>>>> of the >>>>>> > > >>>>>>>>> requirements, >>>>>> > > >>>>>>>>>>>>>> individual >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> The configurable metrics feature is something >>>>>> really >>>>>> > useful, >>>>>> > > >>>>>>>>>>>> especially if >>>>>> > > >>>>>>>>>>>>>> we can somehow make it dynamically configurable >>>>>> without >>>>>> > > >> stopping >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>>> jobs. >>>>>> > > >>>>>>>>>>>>>> It might be better to make it a separate discussion >>>>>> > because >>>>>> > > it >>>>>> > > >>>>> is a >>>>>> > > >>>>>>>>>>>> more >>>>>> > > >>>>>>>>>>>>>> generic feature instead of only for connectors. >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> So in order to make some progress, in this FLIP we >>>>>> can >>>>>> > limit >>>>>> > > >> the >>>>>> > > >>>>>>>>>>>> discussion >>>>>> > > >>>>>>>>>>>>>> scope to the connector related items: >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> - the standard connector metric names and types. >>>>>> > > >>>>>>>>>>>>>> - the abstract ConnectorMetricHandler interface >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> I'll start a separate thread to discuss other >>>>>> general >>>>>> > metric >>>>>> > > >>>>>>> related >>>>>> > > >>>>>>>>>>>>>> enhancement items including: >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> - optional metrics >>>>>> > > >>>>>>>>>>>>>> - dynamic metric configuration >>>>>> > > >>>>>>>>>>>>>> - potential combination with rate limiter >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> Does this plan sound reasonable? >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen < >>>>>> > > >> se...@apache.org> >>>>>> > > >>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> Ignoring for a moment implementation details, this >>>>>> > > connector >>>>>> > > >>>>>>> metrics >>>>>> > > >>>>>>>>>>>> work >>>>>> > > >>>>>>>>>>>>>>> is a really good thing to do, in my opinion >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> The questions "oh, my job seems to be doing >>>>>> nothing, I am >>>>>> > > >>>>> looking >>>>>> > > >>>>>>> at >>>>>> > > >>>>>>>>>>>> the >>>>>> > > >>>>>>>>>>>>>> UI >>>>>> > > >>>>>>>>>>>>>>> and the 'records in' value is still zero" is in >>>>>> the top >>>>>> > > three >>>>>> > > >>>>>>>>> support >>>>>> > > >>>>>>>>>>>>>>> questions I have been asked personally. >>>>>> > > >>>>>>>>>>>>>>> Introspection into "how far is the consumer >>>>>> lagging >>>>>> > behind" >>>>>> > > >>>>> (event >>>>>> > > >>>>>>>>>>>> time >>>>>> > > >>>>>>>>>>>>>>> fetch latency) came up many times as well. >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> So big +1 to solving this problem. >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> About the exact design - I would try to go for the >>>>>> > > following >>>>>> > > >>>>>>>>>>>> properties: >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> - keep complexity of of connectors. Ideally the >>>>>> metrics >>>>>> > > >> handler >>>>>> > > >>>>>>> has >>>>>> > > >>>>>>>>> a >>>>>> > > >>>>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or >>>>>> so, and >>>>>> > > >>>>> everything >>>>>> > > >>>>>>>>>>>> else is >>>>>> > > >>>>>>>>>>>>>>> internal to the handler. That makes it dead >>>>>> simple for >>>>>> > the >>>>>> > > >>>>>>>>> connector. >>>>>> > > >>>>>>>>>>>> We >>>>>> > > >>>>>>>>>>>>>>> can also think of an extensive scheme for >>>>>> connector >>>>>> > > specific >>>>>> > > >>>>>>>>> metrics. >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> - make it configurable on the job it cluster >>>>>> level which >>>>>> > > >>>>> metrics >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>>>>>> handler internally creates when that method is >>>>>> invoked. >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> What do you think? >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> Best, >>>>>> > > >>>>>>>>>>>>>>> Stephan >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler >>>>>> < >>>>>> > > >>>>>>>>> ches...@apache.org >>>>>> > > >>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> As I said before, I believe this to be >>>>>> over-engineered >>>>>> > and >>>>>> > > >>>>> have >>>>>> > > >>>>>>> no >>>>>> > > >>>>>>>>>>>>>>>> interest in this implementation. >>>>>> > > >>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> There are conceptual issues like defining a >>>>>> duplicate >>>>>> > > >>>>>>>>>>>>>> numBytesIn(PerSec) >>>>>> > > >>>>>>>>>>>>>>>> metric that already exists for each operator. >>>>>> > > >>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote: >>>>>> > > >>>>>>>>>>>>>>>>> A few updates to the thread. I uploaded a >>>>>> patch[1] as a >>>>>> > > >>>>> complete >>>>>> > > >>>>>>>>>>>>>>>>> example of how users can use the metrics in the >>>>>> future. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Some thoughts below after taking a look at the >>>>>> > > >>>>>>> AbstractMetricGroup >>>>>> > > >>>>>>>>>>>>>> and >>>>>> > > >>>>>>>>>>>>>>>>> its subclasses. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> This patch intends to provide convenience for >>>>>> Flink >>>>>> > > >> connector >>>>>> > > >>>>>>>>>>>>>>>>> implementations to follow metrics standards >>>>>> proposed in >>>>>> > > >>>>> FLIP-33. >>>>>> > > >>>>>>>>> It >>>>>> > > >>>>>>>>>>>>>>>>> also try to enhance the metric management in >>>>>> general >>>>>> > way >>>>>> > > to >>>>>> > > >>>>> help >>>>>> > > >>>>>>>>>>>>>> users >>>>>> > > >>>>>>>>>>>>>>>>> with: >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 1. metric definition >>>>>> > > >>>>>>>>>>>>>>>>> 2. metric dependencies check >>>>>> > > >>>>>>>>>>>>>>>>> 3. metric validation >>>>>> > > >>>>>>>>>>>>>>>>> 4. metric control (turn on / off particular >>>>>> metrics) >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the >>>>>> > > functionality >>>>>> > > >> of >>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The >>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the >>>>>> metric group >>>>>> > > >>>>>>> hierarchy, >>>>>> > > >>>>>>>>>>>> but >>>>>> > > >>>>>>>>>>>>>>>>> does not really manage the metrics other than >>>>>> keeping >>>>>> > > them >>>>>> > > >>>>> in a >>>>>> > > >>>>>>>>> Map. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Ideally we should only have one entry point for >>>>>> the >>>>>> > > >> metrics. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Right now the entry point is >>>>>> |AbstractMetricGroup|. >>>>>> > > >> However, >>>>>> > > >>>>>>>>> besides >>>>>> > > >>>>>>>>>>>>>>>>> the missing functionality mentioned above, >>>>>> > > >>>>> |AbstractMetricGroup| >>>>>> > > >>>>>>>>>>>>>> seems >>>>>> > > >>>>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could >>>>>> extract it out >>>>>> > > to >>>>>> > > >>>>>>>>>>>>>>>>> flink-metrics in order to use it for generic >>>>>> purpose. >>>>>> > > There >>>>>> > > >>>>> will >>>>>> > > >>>>>>>>> be >>>>>> > > >>>>>>>>>>>>>>>>> some work, though. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics| >>>>>> in this >>>>>> > > patch >>>>>> > > >>>>> as >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>> metric entry point. It wraps metric group and >>>>>> provides >>>>>> > > the >>>>>> > > >>>>>>> missing >>>>>> > > >>>>>>>>>>>>>>>>> functionalities. Then we can roll out this >>>>>> pattern to >>>>>> > > >> runtime >>>>>> > > >>>>>>>>>>>>>>>>> components gradually as well. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> My first thought is that the latter approach >>>>>> gives a >>>>>> > more >>>>>> > > >>>>> smooth >>>>>> > > >>>>>>>>>>>>>>>>> migration. But I am also OK with doing a >>>>>> refactoring on >>>>>> > > the >>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| family. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1 >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin < >>>>>> > > >>>>>>> becket....@gmail.com >>>>>> > > >>>>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote: >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> It might be easier to discuss some >>>>>> implementation >>>>>> > details >>>>>> > > >> in >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>> PR review instead of in the FLIP discussion >>>>>> thread. I >>>>>> > > have >>>>>> > > >> a >>>>>> > > >>>>>>>>>>>>>> patch >>>>>> > > >>>>>>>>>>>>>>>>> for Kafka connectors ready but haven't >>>>>> submitted the PR >>>>>> > > >> yet. >>>>>> > > >>>>>>>>>>>>>>>>> Hopefully that will help explain a bit more. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> ** Re: metric type binding >>>>>> > > >>>>>>>>>>>>>>>>> This is a valid point that worths discussing. >>>>>> If I >>>>>> > > >> understand >>>>>> > > >>>>>>>>>>>>>>>>> correctly, there are two points: >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 1. Metric type / interface does not matter as >>>>>> long as >>>>>> > the >>>>>> > > >>>>>>> metric >>>>>> > > >>>>>>>>>>>>>>>>> semantic is clearly defined. >>>>>> > > >>>>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as >>>>>> the >>>>>> > metric >>>>>> > > >>>>>>>>>>>>>> semantic >>>>>> > > >>>>>>>>>>>>>>>>> is defined, metric type does not matter. To some >>>>>> > extent, >>>>>> > > >>>>> Gauge >>>>>> > > >>>>>>> / >>>>>> > > >>>>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be >>>>>> think of >>>>>> > as >>>>>> > > >>>>> some >>>>>> > > >>>>>>>>>>>>>>>>> well-recognized semantics, if you wish. In >>>>>> Flink, these >>>>>> > > >>>>> metric >>>>>> > > >>>>>>>>>>>>>>>>> semantics have their associated interface >>>>>> classes. In >>>>>> > > >>>>> practice, >>>>>> > > >>>>>>>>>>>>>>>>> such semantic to interface binding seems >>>>>> necessary for >>>>>> > > >>>>>>> different >>>>>> > > >>>>>>>>>>>>>>>>> components to communicate. Simply standardize >>>>>> the >>>>>> > > semantic >>>>>> > > >>>>> of >>>>>> > > >>>>>>>>>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>> connector metrics seems not sufficient for >>>>>> people to >>>>>> > > build >>>>>> > > >>>>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we >>>>>> still >>>>>> > need >>>>>> > > >> to >>>>>> > > >>>>>>>>> have >>>>>> > > >>>>>>>>>>>>>>>>> some embodiment of the metric semantics that >>>>>> people can >>>>>> > > >>>>> program >>>>>> > > >>>>>>>>>>>>>>>>> against. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be >>>>>> exposed >>>>>> > > using >>>>>> > > >>>>>>>>>>>>>>>>> different metric types / interfaces. >>>>>> > > >>>>>>>>>>>>>>>>> This is a good point. Counter and >>>>>> Gauge-as-a-Counter >>>>>> > are >>>>>> > > >>>>> pretty >>>>>> > > >>>>>>>>>>>>>>>>> much interchangeable. This is more of a >>>>>> trade-off >>>>>> > between >>>>>> > > >> the >>>>>> > > >>>>>>>>>>>>>> user >>>>>> > > >>>>>>>>>>>>>>>>> experience of metric producers and consumers. >>>>>> The >>>>>> > metric >>>>>> > > >>>>>>>>>>>>>> producers >>>>>> > > >>>>>>>>>>>>>>>>> want to use Counter or Gauge depending on >>>>>> whether the >>>>>> > > >> counter >>>>>> > > >>>>>>> is >>>>>> > > >>>>>>>>>>>>>>>>> already tracked in code, while ideally the >>>>>> metric >>>>>> > > consumers >>>>>> > > >>>>>>> only >>>>>> > > >>>>>>>>>>>>>>>>> want to see a single metric type for each >>>>>> metric. I am >>>>>> > > >>>>> leaning >>>>>> > > >>>>>>>>>>>>>>>>> towards to make the metric producers happy, >>>>>> i.e. allow >>>>>> > > >> Gauge >>>>>> > > >>>>> / >>>>>> > > >>>>>>>>>>>>>>>>> Counter metric type, and the the metric >>>>>> consumers >>>>>> > handle >>>>>> > > >> the >>>>>> > > >>>>>>>>> type >>>>>> > > >>>>>>>>>>>>>>>>> variation. The reason is that in practice, >>>>>> there might >>>>>> > be >>>>>> > > >>>>> more >>>>>> > > >>>>>>>>>>>>>>>>> connector implementations than metric reporter >>>>>> > > >>>>> implementations. >>>>>> > > >>>>>>>>>>>>>> We >>>>>> > > >>>>>>>>>>>>>>>>> could also provide some helper method to >>>>>> facilitate >>>>>> > > reading >>>>>> > > >>>>>>> from >>>>>> > > >>>>>>>>>>>>>>>>> such variable metric type. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Just some quick replies to the comments around >>>>>> > > >> implementation >>>>>> > > >>>>>>>>>>>>>>>> details. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>>>> except >>>>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Register connector specific ones in a single >>>>>> place is >>>>>> > > >>>>> actually >>>>>> > > >>>>>>>>>>>>>>>>> something that I want to achieve. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases >>>>>> like >>>>>> > > >>>>> Prometheus. >>>>>> > > >>>>>>>>>>>>>> We >>>>>> > > >>>>>>>>>>>>>>>>> would >>>>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>> > > >>>>>>>>> fetchTime/emitTime >>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend >>>>>> (Prometheus), >>>>>> > where a >>>>>> > > >>>>>>> user >>>>>> > > >>>>>>>>>>>>>>>>> could build >>>>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants >>>>>> it. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Not sure if such downsampling works. As an >>>>>> example, if >>>>>> > a >>>>>> > > >> user >>>>>> > > >>>>>>>>>>>>>>>>> complains that there are some intermittent >>>>>> latency >>>>>> > spikes >>>>>> > > >>>>>>> (maybe >>>>>> > > >>>>>>>>>>>>>> a >>>>>> > > >>>>>>>>>>>>>>>>> few records in 10 seconds) in their processing >>>>>> system. >>>>>> > > >>>>> Having a >>>>>> > > >>>>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems >>>>>> unlikely >>>>>> > > useful. >>>>>> > > >>>>>>>>>>>>>>>>> However by looking at actual 99.9 percentile >>>>>> latency >>>>>> > > might >>>>>> > > >>>>>>> help. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler >>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org >>>>>> >> >>>>>> > wrote: >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Re: over complication of implementation. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> I think I get understand better know what >>>>>> you're >>>>>> > > >> shooting >>>>>> > > >>>>>>>>>>>>>> for, >>>>>> > > >>>>>>>>>>>>>>>>> effectively something like the >>>>>> OperatorIOMetricGroup. >>>>>> > > >>>>>>>>>>>>>>>>> But still, re-define setupConnectorMetrics() >>>>>> to >>>>>> > accept >>>>>> > > a >>>>>> > > >>>>>>> set >>>>>> > > >>>>>>>>>>>>>>>>> of flags >>>>>> > > >>>>>>>>>>>>>>>>> for counters/meters(ans _possibly_ >>>>>> histograms) along >>>>>> > > >>>>> with a >>>>>> > > >>>>>>>>>>>>>>>>> set of >>>>>> > > >>>>>>>>>>>>>>>>> well-defined Optional<Gauge<?>>, and return >>>>>> the >>>>>> > group. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Solves all issues as far as i can tell: >>>>>> > > >>>>>>>>>>>>>>>>> 1) no metrics must be created manually (except >>>>>> > Gauges, >>>>>> > > >>>>>>> which >>>>>> > > >>>>>>>>>>>>>>> are >>>>>> > > >>>>>>>>>>>>>>>>> effectively just Suppliers and you can't get >>>>>> around >>>>>> > > >>>>> this), >>>>>> > > >>>>>>>>>>>>>>>>> 2) additional metrics can be registered on the >>>>>> > returned >>>>>> > > >>>>>>>>>>>>>> group, >>>>>> > > >>>>>>>>>>>>>>>>> 3) see 1), >>>>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>>>> except >>>>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> Re: Histogram >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 1) As an example, whether "numRecordsIn" is >>>>>> exposed >>>>>> > as >>>>>> > > a >>>>>> > > >>>>>>>>>>>>>>>>> Counter or a >>>>>> > > >>>>>>>>>>>>>>>>> Gauge should be irrelevant. So far we're >>>>>> using the >>>>>> > > >> metric >>>>>> > > >>>>>>>>>>>>>> type >>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>> > > >>>>>>>>>>>>>>>>> the most convenient at exposing a given >>>>>> value. If >>>>>> > there >>>>>> > > >>>>> is >>>>>> > > >>>>>>>>>>>>>>>>> some backing >>>>>> > > >>>>>>>>>>>>>>>>> data-structure that we want to expose some >>>>>> data from >>>>>> > we >>>>>> > > >>>>>>>>>>>>>>>>> typically opt >>>>>> > > >>>>>>>>>>>>>>>>> for a Gauge, as otherwise we're just mucking >>>>>> around >>>>>> > > with >>>>>> > > >>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>> Meter/Counter API to get it to match. >>>>>> Similarly, if >>>>>> > we >>>>>> > > >>>>> want >>>>>> > > >>>>>>>>>>>>>> to >>>>>> > > >>>>>>>>>>>>>>>>> count >>>>>> > > >>>>>>>>>>>>>>>>> something but no current count exists we >>>>>> typically >>>>>> > > added >>>>>> > > >>>>> a >>>>>> > > >>>>>>>>>>>>>>>>> Counter. >>>>>> > > >>>>>>>>>>>>>>>>> That's why attaching semantics to metric >>>>>> types makes >>>>>> > > >>>>> little >>>>>> > > >>>>>>>>>>>>>>>>> sense (but >>>>>> > > >>>>>>>>>>>>>>>>> unfortunately several reporters already do >>>>>> it); for >>>>>> > > >>>>>>>>>>>>>>>>> counters/meters >>>>>> > > >>>>>>>>>>>>>>>>> certainly, but the majority of metrics are >>>>>> gauges. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases >>>>>> like >>>>>> > > >>>>> Prometheus. >>>>>> > > >>>>>>>>>>>>>> We >>>>>> > > >>>>>>>>>>>>>>>>> would >>>>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>> > > >>>>>>>>> fetchTime/emitTime >>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend >>>>>> (Prometheus), >>>>>> > where a >>>>>> > > >>>>>>> user >>>>>> > > >>>>>>>>>>>>>>>>> could build >>>>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants >>>>>> it. >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> On 22.02.2019 13:57, Becket Qin wrote: >>>>>> > > >>>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the explanation. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: FLIP >>>>>> > > >>>>>>>>>>>>>>>>>> I might have misunderstood this, but it seems >>>>>> that >>>>>> > > "major >>>>>> > > >>>>>>>>>>>>>>>>> changes" are well >>>>>> > > >>>>>>>>>>>>>>>>>> defined in FLIP. The full contents is >>>>>> following: >>>>>> > > >>>>>>>>>>>>>>>>>> What is considered a "major change" that needs >>>>>> a FLIP? >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Any of the following should be considered a >>>>>> major >>>>>> > > change: >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of >>>>>> > > >>>>>>>>>>>>>>>>> functionality >>>>>> > > >>>>>>>>>>>>>>>>>> - *Any change that impacts the public >>>>>> interfaces of >>>>>> > the >>>>>> > > >>>>>>>>>>>>>>>>> project* >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> What are the "public interfaces" of the >>>>>> project? >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> *All of the following are public interfaces >>>>>> *that >>>>>> > people >>>>>> > > >>>>>>>>>>>>>>>>> build around: >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - DataStream and DataSet API, including classes >>>>>> > related >>>>>> > > >>>>>>>>>>>>>>>>> to that, such as >>>>>> > > >>>>>>>>>>>>>>>>>> StreamExecutionEnvironment >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - Classes marked with the @Public annotation >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - On-disk binary formats, such as >>>>>> > > >>>>>>>>>>>>>> checkpoints/savepoints >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e. >>>>>> > > >>>>>>>>>>>>>>>>> bin/flink, Yarn scripts, >>>>>> > > >>>>>>>>>>>>>>>>>> Mesos scripts >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - Configuration settings >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> - *Exposed monitoring information* >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> So any monitoring information change is >>>>>> considered as >>>>>> > > >>>>>>>>>>>>>> public >>>>>> > > >>>>>>>>>>>>>>>>> interface, and >>>>>> > > >>>>>>>>>>>>>>>>>> any public interface change is considered as a >>>>>> "major >>>>>> > > >>>>>>>>>>>>>>> change". >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: over complication of implementation. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Although this is more of implementation >>>>>> details that >>>>>> > is >>>>>> > > >> not >>>>>> > > >>>>>>>>>>>>>>>>> covered by the >>>>>> > > >>>>>>>>>>>>>>>>>> FLIP. But it may be worth discussing. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> First of all, I completely agree that we >>>>>> should use >>>>>> > the >>>>>> > > >>>>>>>>>>>>>>>>> simplest way to >>>>>> > > >>>>>>>>>>>>>>>>>> achieve our goal. To me the goal is the >>>>>> following: >>>>>> > > >>>>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces. >>>>>> > > >>>>>>>>>>>>>>>>>> 2. The easiness of creating a connector. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Both of them are important to the prosperity >>>>>> of the >>>>>> > > >>>>>>>>>>>>>>>>> connector ecosystem. So >>>>>> > > >>>>>>>>>>>>>>>>>> I'd rather abstract as much as possible on our >>>>>> side to >>>>>> > > >> make >>>>>> > > >>>>>>>>>>>>>>>>> the connector >>>>>> > > >>>>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a >>>>>> static >>>>>> > util >>>>>> > > >>>>>>>>>>>>>>>>> method approach >>>>>> > > >>>>>>>>>>>>>>>>>> might have a few drawbacks: >>>>>> > > >>>>>>>>>>>>>>>>>> 1. Users still have to construct the metrics by >>>>>> > > >> themselves. >>>>>> > > >>>>>>>>>>>>>>>>> (And note that >>>>>> > > >>>>>>>>>>>>>>>>>> this might be erroneous by itself. For >>>>>> example, a >>>>>> > > customer >>>>>> > > >>>>>>>>>>>>>>>>> wrapper around >>>>>> > > >>>>>>>>>>>>>>>>>> dropwizard meter maybe used instead of >>>>>> MeterView). >>>>>> > > >>>>>>>>>>>>>>>>>> 2. When connector specific metrics are added, >>>>>> it is >>>>>> > > >>>>>>>>>>>>>>>>> difficult to enforce >>>>>> > > >>>>>>>>>>>>>>>>>> the scope to be the same as standard metrics. >>>>>> > > >>>>>>>>>>>>>>>>>> 3. It seems that a method proliferation is >>>>>> inevitable >>>>>> > if >>>>>> > > >> we >>>>>> > > >>>>>>>>>>>>>>>>> want to apply >>>>>> > > >>>>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn >>>>>> was not >>>>>> > > >>>>>>>>>>>>>>>>> registered for a meter. >>>>>> > > >>>>>>>>>>>>>>>>>> 4. Metrics are still defined in random places >>>>>> and hard >>>>>> > > to >>>>>> > > >>>>>>>>>>>>>>>> track. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> The current PR I had was inspired by the >>>>>> Config system >>>>>> > > in >>>>>> > > >>>>>>>>>>>>>>>>> Kafka, which I >>>>>> > > >>>>>>>>>>>>>>>>>> found pretty handy. In fact it is not only >>>>>> used by >>>>>> > Kafka >>>>>> > > >>>>>>>>>>>>>>>>> itself but even >>>>>> > > >>>>>>>>>>>>>>>>>> some other projects that depend on Kafka. I am >>>>>> not >>>>>> > > saying >>>>>> > > >>>>>>>>>>>>>>>>> this approach is >>>>>> > > >>>>>>>>>>>>>>>>>> perfect. But I think it worths to save the >>>>>> work for >>>>>> > > >>>>>>>>>>>>>>>>> connector writers and >>>>>> > > >>>>>>>>>>>>>>>>>> encourage more systematic implementation. That >>>>>> being >>>>>> > > said, >>>>>> > > >>>>>>>>>>>>>> I >>>>>> > > >>>>>>>>>>>>>>>>> am fully open >>>>>> > > >>>>>>>>>>>>>>>>>> to suggestions. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Re: Histogram >>>>>> > > >>>>>>>>>>>>>>>>>> I think there are two orthogonal questions >>>>>> around >>>>>> > those >>>>>> > > >>>>>>>>>>>>>>>> metrics: >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just >>>>>> looking at >>>>>> > the >>>>>> > > >>>>>>>>>>>>>>>>> meaning of a >>>>>> > > >>>>>>>>>>>>>>>>>> metric, is generic to all connectors? If the >>>>>> answer is >>>>>> > > >> yes, >>>>>> > > >>>>>>>>>>>>>>>>> we should >>>>>> > > >>>>>>>>>>>>>>>>>> include the metric into the convention. No >>>>>> matter >>>>>> > > whether >>>>>> > > >>>>>>>>>>>>>> we >>>>>> > > >>>>>>>>>>>>>>>>> include it >>>>>> > > >>>>>>>>>>>>>>>>>> into the convention or not, some connector >>>>>> > > implementations >>>>>> > > >>>>>>>>>>>>>>>>> will emit such >>>>>> > > >>>>>>>>>>>>>>>>>> metric. It is better to have a convention than >>>>>> letting >>>>>> > > >> each >>>>>> > > >>>>>>>>>>>>>>>>> connector do >>>>>> > > >>>>>>>>>>>>>>>>>> random things. >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what >>>>>> should we >>>>>> > > do? >>>>>> > > >>>>>>>>>>>>>>>>>> I agree that we should make it clear that using >>>>>> > > histograms >>>>>> > > >>>>>>>>>>>>>>>>> will have >>>>>> > > >>>>>>>>>>>>>>>>>> performance risk. But I do see histogram is >>>>>> useful in >>>>>> > > some >>>>>> > > >>>>>>>>>>>>>>>>> fine-granularity >>>>>> > > >>>>>>>>>>>>>>>>>> debugging where one do not have the luxury to >>>>>> stop the >>>>>> > > >>>>>>>>>>>>>>>>> system and inject >>>>>> > > >>>>>>>>>>>>>>>>>> more inspection code. So the workaround I am >>>>>> thinking >>>>>> > is >>>>>> > > >> to >>>>>> > > >>>>>>>>>>>>>>>>> provide some >>>>>> > > >>>>>>>>>>>>>>>>>> implementation suggestions. Assume later on we >>>>>> have a >>>>>> > > >>>>>>>>>>>>>>>>> mechanism of >>>>>> > > >>>>>>>>>>>>>>>>>> selective metrics. In the abstract metrics >>>>>> class we >>>>>> > can >>>>>> > > >>>>>>>>>>>>>>>>> disable those >>>>>> > > >>>>>>>>>>>>>>>>>> metrics by default individual connector >>>>>> writers does >>>>>> > not >>>>>> > > >>>>>>>>>>>>>>>>> have to do >>>>>> > > >>>>>>>>>>>>>>>>>> anything (this is another advantage of having >>>>>> an >>>>>> > > >>>>>>>>>>>>>>>>> AbstractMetrics instead of >>>>>> > > >>>>>>>>>>>>>>>>>> static util methods.) >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> I am not sure I fully understand the histogram >>>>>> in the >>>>>> > > >>>>>>>>>>>>>>>>> backend approach. Can >>>>>> > > >>>>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting >>>>>> the raw >>>>>> > > data, >>>>>> > > >>>>>>>>>>>>>>>>> e.g. fetchTime >>>>>> > > >>>>>>>>>>>>>>>>>> and emitTime with each record and let the >>>>>> histogram >>>>>> > > >>>>>>>>>>>>>>>>> computation happen in >>>>>> > > >>>>>>>>>>>>>>>>>> the background? Or let the processing thread >>>>>> putting >>>>>> > the >>>>>> > > >>>>>>>>>>>>>>>>> values into a >>>>>> > > >>>>>>>>>>>>>>>>>> queue and have a separate thread polling from >>>>>> the >>>>>> > queue >>>>>> > > >> and >>>>>> > > >>>>>>>>>>>>>>>>> add them into >>>>>> > > >>>>>>>>>>>>>>>>>> the histogram? >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay >>>>>> Schepler >>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto: >>>>>> ches...@apache.org>> >>>>>> > > wrote: >>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Flip >>>>>> > > >>>>>>>>>>>>>>>>>>> The very first line under both the main >>>>>> header and >>>>>> > > >> Purpose >>>>>> > > >>>>>>>>>>>>>>>>> section >>>>>> > > >>>>>>>>>>>>>>>>>>> describe Flips as "major changes", which this >>>>>> isn't. >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> Re: complication >>>>>> > > >>>>>>>>>>>>>>>>>>> I'm not arguing against standardization, but >>>>>> again an >>>>>> > > >>>>>>>>>>>>>>>>> over-complicated >>>>>> > > >>>>>>>>>>>>>>>>>>> implementation when a static utility method >>>>>> would be >>>>>> > > >>>>>>>>>>>>>>>>> sufficient. >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> public static void setupConnectorMetrics( >>>>>> > > >>>>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup, >>>>>> > > >>>>>>>>>>>>>>>>>>> String connectorName, >>>>>> > > >>>>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn, >>>>>> > > >>>>>>>>>>>>>>>>>>> ...) >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> This gives you all you need: >>>>>> > > >>>>>>>>>>>>>>>>>>> * a well-defined set of metrics for a >>>>>> connector to >>>>>> > > opt-in >>>>>> > > >>>>>>>>>>>>>>>>>>> * standardized naming schemes for scope and >>>>>> > individual >>>>>> > > >>>>>>>>>>>>>>> metrics >>>>>> > > >>>>>>>>>>>>>>>>>>> * standardize metric types (although >>>>>> personally I'm >>>>>> > not >>>>>> > > >>>>>>>>>>>>>>>>> interested in that >>>>>> > > >>>>>>>>>>>>>>>>>>> since metric types should be considered >>>>>> syntactic >>>>>> > > sugar) >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Configurable Histogram >>>>>> > > >>>>>>>>>>>>>>>>>>> If anything they _must_ be turned off by >>>>>> default, but >>>>>> > > the >>>>>> > > >>>>>>>>>>>>>>>>> metric system is >>>>>> > > >>>>>>>>>>>>>>>>>>> already exposing so many options that I'm not >>>>>> too >>>>>> > keen >>>>>> > > on >>>>>> > > >>>>>>>>>>>>>>>>> adding even more. >>>>>> > > >>>>>>>>>>>>>>>>>>> You have also only addressed my first argument >>>>>> > against >>>>>> > > >>>>>>>>>>>>>>>>> histograms >>>>>> > > >>>>>>>>>>>>>>>>>>> (performance), the second one still stands >>>>>> (calculate >>>>>> > > >>>>>>>>>>>>>>>>> histogram in metric >>>>>> > > >>>>>>>>>>>>>>>>>>> backends instead). >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is >>>>>> worthy of a >>>>>> > > >> FLIP >>>>>> > > >>>>>>>>>>>>>>>>> because it is >>>>>> > > >>>>>>>>>>>>>>>>>>>> public API. According to the FLIP >>>>>> description a FlIP >>>>>> > > is >>>>>> > > >>>>>>>>>>>>>>>>> required in case >>>>>> > > >>>>>>>>>>>>>>>>>>> of: >>>>>> > > >>>>>>>>>>>>>>>>>>>> - Any change that impacts the public >>>>>> interfaces of >>>>>> > > >>>>>>>>>>>>>>>>> the project >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> and the following entry is found in the >>>>>> definition >>>>>> > of >>>>>> > > >>>>>>>>>>>>>>>>> "public interface". >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> - Exposed monitoring information >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> Metrics are critical to any production >>>>>> system. So a >>>>>> > > >> clear >>>>>> > > >>>>>>>>>>>>>>>>> metric >>>>>> > > >>>>>>>>>>>>>>>>>>> definition >>>>>> > > >>>>>>>>>>>>>>>>>>>> is important for any serious users. For an >>>>>> > > organization >>>>>> > > >>>>>>>>>>>>>>>>> with large Flink >>>>>> > > >>>>>>>>>>>>>>>>>>>> installation, change in metrics means great >>>>>> amount >>>>>> > of >>>>>> > > >>>>>>>>>>>>>>>>> work. So such >>>>>> > > >>>>>>>>>>>>>>>>>>> changes >>>>>> > > >>>>>>>>>>>>>>>>>>>> do need to be fully discussed and documented. >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: Histogram. >>>>>> > > >>>>>>>>>>>>>>>>>>>> We can discuss whether there is a better way >>>>>> to >>>>>> > expose >>>>>> > > >>>>>>>>>>>>>>>>> metrics that are >>>>>> > > >>>>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark >>>>>> on >>>>>> > various >>>>>> > > >>>>>>>>>>>>>>>>> histogram >>>>>> > > >>>>>>>>>>>>>>>>>>>> implementations also indicates that they are >>>>>> > > >>>>>>>>>>>>>> significantly >>>>>> > > >>>>>>>>>>>>>>>>> slower than >>>>>> > > >>>>>>>>>>>>>>>>>>>> other metric types. But I don't think that >>>>>> means >>>>>> > never >>>>>> > > >>>>>>>>>>>>>> use >>>>>> > > >>>>>>>>>>>>>>>>> histogram, but >>>>>> > > >>>>>>>>>>>>>>>>>>>> means use it with caution. For example, we >>>>>> can >>>>>> > suggest >>>>>> > > >>>>>>>>>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>>>> implementations >>>>>> > > >>>>>>>>>>>>>>>>>>>> to turn them off by default and only turn it >>>>>> on for >>>>>> > a >>>>>> > > >>>>>>>>>>>>>>>>> small amount of >>>>>> > > >>>>>>>>>>>>>>>>>>> time >>>>>> > > >>>>>>>>>>>>>>>>>>>> when performing some micro-debugging. >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: complication: >>>>>> > > >>>>>>>>>>>>>>>>>>>> Connector conventions are essential for Flink >>>>>> > > ecosystem. >>>>>> > > >>>>>>>>>>>>>>>>> Flink connectors >>>>>> > > >>>>>>>>>>>>>>>>>>>> pool is probably the most important part of >>>>>> Flink, >>>>>> > > just >>>>>> > > >>>>>>>>>>>>>>>>> like any other >>>>>> > > >>>>>>>>>>>>>>>>>>> data >>>>>> > > >>>>>>>>>>>>>>>>>>>> system. Clear conventions of connectors will >>>>>> help >>>>>> > > build >>>>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem >>>>>> > > >>>>>>>>>>>>>>>>>>> in >>>>>> > > >>>>>>>>>>>>>>>>>>>> a more organic way. >>>>>> > > >>>>>>>>>>>>>>>>>>>> Take the metrics convention as an example, >>>>>> imagine >>>>>> > > >>>>>>>>>>>>>> someone >>>>>> > > >>>>>>>>>>>>>>>>> has developed >>>>>> > > >>>>>>>>>>>>>>>>>>> a >>>>>> > > >>>>>>>>>>>>>>>>>>>> Flink connector for System foo, and another >>>>>> > developer >>>>>> > > >> may >>>>>> > > >>>>>>>>>>>>>>>>> have developed >>>>>> > > >>>>>>>>>>>>>>>>>>> a >>>>>> > > >>>>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for >>>>>> Flink which >>>>>> > > >>>>>>>>>>>>>>>>> analyzes the Flink >>>>>> > > >>>>>>>>>>>>>>>>>>> job >>>>>> > > >>>>>>>>>>>>>>>>>>>> performance based on metrics. With a clear >>>>>> metric >>>>>> > > >>>>>>>>>>>>>>>>> convention, those two >>>>>> > > >>>>>>>>>>>>>>>>>>>> projects could be developed independently. >>>>>> Once >>>>>> > users >>>>>> > > >> put >>>>>> > > >>>>>>>>>>>>>>>>> them together, >>>>>> > > >>>>>>>>>>>>>>>>>>>> it would work without additional >>>>>> modifications. This >>>>>> > > >>>>>>>>>>>>>>>>> cannot be easily >>>>>> > > >>>>>>>>>>>>>>>>>>>> achieved by just defining a few constants. >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: selective metrics: >>>>>> > > >>>>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate >>>>>> thread. >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> @Dawid >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency >>>>>> > > >>>>>>>>>>>>>>>>>>>> The primary purpose of establish such a >>>>>> convention >>>>>> > is >>>>>> > > to >>>>>> > > >>>>>>>>>>>>>>>>> help developers >>>>>> > > >>>>>>>>>>>>>>>>>>>> write connectors in a more compatible way. >>>>>> The >>>>>> > > >> convention >>>>>> > > >>>>>>>>>>>>>>>>> is supposed to >>>>>> > > >>>>>>>>>>>>>>>>>>> be >>>>>> > > >>>>>>>>>>>>>>>>>>>> defined more proactively. So when look at the >>>>>> > > >> convention, >>>>>> > > >>>>>>>>>>>>>>>>> it seems more >>>>>> > > >>>>>>>>>>>>>>>>>>>> important to see if the concept is >>>>>> applicable to >>>>>> > > >>>>>>>>>>>>>>>>> connectors in general. >>>>>> > > >>>>>>>>>>>>>>>>>>> It >>>>>> > > >>>>>>>>>>>>>>>>>>>> might be true so far only Kafka connector >>>>>> reports >>>>>> > > >>>>>>>>>>>>>> latency. >>>>>> > > >>>>>>>>>>>>>>>>> But there >>>>>> > > >>>>>>>>>>>>>>>>>>> might >>>>>> > > >>>>>>>>>>>>>>>>>>>> be hundreds of other connector >>>>>> implementations in >>>>>> > the >>>>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem, >>>>>> > > >>>>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of >>>>>> them also >>>>>> > > >> emits >>>>>> > > >>>>>>>>>>>>>>>>> latency. I >>>>>> > > >>>>>>>>>>>>>>>>>>> think >>>>>> > > >>>>>>>>>>>>>>>>>>>> a lot of other sources actually also has an >>>>>> append >>>>>> > > >>>>>>>>>>>>>>>>> timestamp. e.g. >>>>>> > > >>>>>>>>>>>>>>>>>>> database >>>>>> > > >>>>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't >>>>>> be >>>>>> > > surprised >>>>>> > > >>>>>>>>>>>>>>>>> if some database >>>>>> > > >>>>>>>>>>>>>>>>>>>> connector can also emit latency metrics. >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay >>>>>> Schepler >>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto: >>>>>> ches...@apache.org>> >>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to >>>>>> investigate >>>>>> > > this >>>>>> > > >>>>>>>>>>>>>> as >>>>>> > > >>>>>>>>>>>>>>>>> part of this >>>>>> > > >>>>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of >>>>>> interest >>>>>> > for >>>>>> > > >>>>>>>>>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>> entire metric >>>>>> > > >>>>>>>>>>>>>>>>>>>>> system, and should be designed for as such. >>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole: >>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the >>>>>> core of >>>>>> > > >>>>>>>>>>>>>>>>> Flink. They are >>>>>> > > >>>>>>>>>>>>>>>>>>>>> significantly more expensive than other >>>>>> metrics, >>>>>> > and >>>>>> > > >>>>>>>>>>>>>>>>> calculating >>>>>> > > >>>>>>>>>>>>>>>>>>>>> histograms in the application is regarded >>>>>> as an >>>>>> > > >>>>>>>>>>>>>>>>> anti-pattern by several >>>>>> > > >>>>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to >>>>>> expose >>>>>> > the >>>>>> > > >> raw >>>>>> > > >>>>>>>>>>>>>>>>> data and >>>>>> > > >>>>>>>>>>>>>>>>>>>>> calculate the histogram in the backend. >>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> Second, this seems overly complicated. >>>>>> Given that >>>>>> > we >>>>>> > > >>>>>>>>>>>>>>>>> already established >>>>>> > > >>>>>>>>>>>>>>>>>>>>> that not all connectors will export all >>>>>> metrics we >>>>>> > > are >>>>>> > > >>>>>>>>>>>>>>>>> effectively >>>>>> > > >>>>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming >>>>>> scheme. >>>>>> > We >>>>>> > > >>>>>>>>>>>>>>>>> don't need anything >>>>>> > > >>>>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a few >>>>>> > > constants >>>>>> > > >>>>>>>>>>>>>>>>> that all >>>>>> > > >>>>>>>>>>>>>>>>>>>>> connectors use. >>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a >>>>>> FLIP. >>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. >>>>>> But >>>>>> > those >>>>>> > > >>>>>>>>>>>>>>>>> particular metrics >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would >>>>>> only be >>>>>> > > >>>>>>>>>>>>>>>>> reported if user uses >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal >>>>>> timestampAssigner with >>>>>> > > >>>>>>>>>>>>>>>>> StreamCharacteristic >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like >>>>>> a very >>>>>> > > >>>>>>>>>>>>>>>>> specific case. I am >>>>>> > > >>>>>>>>>>>>>>>>>>> not >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic >>>>>> metric that >>>>>> > > will >>>>>> > > >>>>>>>>>>>>>> be >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> disabled/absent for most of >>>>>> implementations. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, >>>>>> that >>>>>> > > might >>>>>> > > >>>>>>>>>>>>>>>>> make sense to >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> investigate in the future. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense >>>>>> to me. >>>>>> > > >> There >>>>>> > > >>>>>>>>>>>>>>>>> are two cases >>>>>> > > >>>>>>>>>>>>>>>>>>> to >>>>>> > > >>>>>>>>>>>>>>>>>>>>> be >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> addressed. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a >>>>>> guidance. It >>>>>> > is >>>>>> > > >>>>>>>>>>>>>>>>> likely that a >>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> only supports some but not all of the >>>>>> metrics. In >>>>>> > > >> that >>>>>> > > >>>>>>>>>>>>>>>>> case, each >>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> implementation should have the freedom to >>>>>> decide >>>>>> > > >> which >>>>>> > > >>>>>>>>>>>>>>>>> metrics are >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are >>>>>> supported, the >>>>>> > > >>>>>>>>>>>>>>>>> guidance should be >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> followed. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable >>>>>> certain >>>>>> > > >> metrics >>>>>> > > >>>>>>>>>>>>>>>>> for some reason >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of >>>>>> data). A >>>>>> > > generic >>>>>> > > >>>>>>>>>>>>>>>>> mechanism should >>>>>> > > >>>>>>>>>>>>>>>>>>> be >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> provided to allow user choose which >>>>>> metrics are >>>>>> > > >>>>>>>>>>>>>>>>> reported. This >>>>>> > > >>>>>>>>>>>>>>>>>>> mechanism >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> should also be honored by the connector >>>>>> > > >>>>>>>>>>>>>> implementations. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you? >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid >>>>>> Wysakowicz >>>>>> > < >>>>>> > > >>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto: >>>>>> > > dwysakow...@apache.org >>>>>> > > >>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a >>>>>> unified, >>>>>> > > >>>>>>>>>>>>>>>>> standard set of >>>>>> > > >>>>>>>>>>>>>>>>>>> metrics >>>>>> > > >>>>>>>>>>>>>>>>>>>>> for >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight >>>>>> concerns >>>>>> > about >>>>>> > > >>>>>>>>>>>>>>>>> fetchLatency and >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based >>>>>> on >>>>>> > > EventTime >>>>>> > > >>>>>>>>>>>>>>>>> which is not a >>>>>> > > >>>>>>>>>>>>>>>>>>>>> purely >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on >>>>>> some >>>>>> > > business >>>>>> > > >>>>>>>>>>>>>>>>> logic, might be >>>>>> > > >>>>>>>>>>>>>>>>>>>>> absent >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics >>>>>> could >>>>>> > also >>>>>> > > >>>>>>>>>>>>>>>>> behave in a weird >>>>>> > > >>>>>>>>>>>>>>>>>>>>> way in >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I >>>>>> am not >>>>>> > sure >>>>>> > > >> if >>>>>> > > >>>>>>>>>>>>>>>>> we should >>>>>> > > >>>>>>>>>>>>>>>>>>> include >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could >>>>>> at >>>>>> > least >>>>>> > > >>>>>>>>>>>>>>>>> introduce a feature >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think? >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the >>>>>> proposed >>>>>> > > >>>>>>>>>>>>>>>>> metrics. I'll start a >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> voting thread later toady. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket >>>>>> Qin >>>>>> > > >>>>>>>>>>>>>>>>> <becket....@gmail.com <mailto: >>>>>> becket....@gmail.com>> >>>>>> > < >>>>>> > > >>>>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto: >>>>>> becket....@gmail.com >>>>>> > >> >>>>>> > > >>>>>>>>>>>>>>> wrote: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi folks, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP >>>>>> discussion thread >>>>>> > > >>>>>>>>>>>>>> about >>>>>> > > >>>>>>>>>>>>>>>>> standardize >>>>>> > > >>>>>>>>>>>>>>>>>>> the >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> connector metrics. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a >>>>>> convention >>>>>> > of >>>>>> > > >>>>>>>>>>>>>>>>> Flink connector >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the >>>>>> monitoring >>>>>> > and >>>>>> > > >>>>>>>>>>>>>>>>> alerting on Flink >>>>>> > > >>>>>>>>>>>>>>>>>>>>> jobs. >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> The FLIP link is following: >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>> >>>>>> > > >> >>>>>> > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>>>> >>>>>> > > >>>>> >>>>>> > > >>>>> >>>>>> > > >> >>>>>> > > >> >>>>>> > > >>>>>> > > >>>>>> > >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Konstantin Knauf >>>>>> >>>>>> https://twitter.com/snntrable >>>>>> >>>>>> https://github.com/knaufk >>>>>> >>>>>