Hey Stephan, Thanks for the quick reply. I actually forgot to mention that I also added a "watermarkLag" metric to the Source metrics in addition to the "currentFetchEventTimeLag" and "currentEmitEventTimeLag". So in the current proposal, both XXEventTimeLag will be based on event timestamps, while the watermarkLag is based on watermark. And all the lags are defined against the wall clock time. Does this cover what you are proposing?
Thanks, Jiangjie (Becket) Qin On Fri, Sep 18, 2020 at 6:08 PM Stephan Ewen <step...@ververica.com> wrote: > Hi Becket! > > I am wondering if it makes sense to do the following small change: > > - Have "currentFetchEventTimeLag" be defined on event timestamps > (optionally, if the source system exposes it) <== this is like in your > proposal > this helps understand how long the records were in the source before > > - BUT change "currentEmitEventTimeLag" to be "currentSourceWatermarkLag" > instead. > That way, users can see how far behind the wall-clock-time the source > progress is all in all. > It is also a more well defined metric as it does not oscillate with > out-of-order events. > > What do you think? > > Best, > Stephan > > > > On Fri, Sep 18, 2020 at 12:02 PM Becket Qin <becket....@gmail.com> wrote: > >> Hi folks, >> >> Thanks for all the great feedback. I have just updated FLIP-33 wiki with >> the following changes: >> >> 1. Renaming. "currentFetchLatency" to "currentFetchEventTimeLag", >> "currentLatency" to "currentEmitEventTimeLag". >> 2. Added the public interface code change required for the new metrics. >> 3. Added description of whether a metric is predefined or optional, and >> which component is expected to update the metric. >> >> Please let me know if you have any questions. I'll start a vote in two >> days if there are no further concerns. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Wed, Sep 9, 2020 at 9:56 AM Becket Qin <becket....@gmail.com> wrote: >> >>> Hi Stephan, >>> >>> Thanks for the input. Just a few more clarifications / questions. >>> >>> *Num Bytes / Records Metrics* >>> >>> 1. At this point, the *numRecordsIn(Rate)* metrics exist in both >>> OperatorIOMetricGroup and TaskIOMetricGroup. I did not find >>> *numRecordsIn(Rate)* in the TaskIOMetricGroup updated anywhere other >>> than in the unit tests. Am I missing something? >>> >>> 2. *numBytesIn(Rate)* metrics only exist in TaskIOMetricGroup. At this >>> point, the SourceReaders only has access to a SourceReaderContext which >>> provides an OperatorMetricGroup. So it seems that the connector developers >>> are not able to update the *numBytesIn(Rate). *With the multiple Source >>> chaining support, it is possible that there are multiple Sources are in the >>> same task. So it looks that we need to add *numBytesIn(Rate)* to the >>> operator metrics as well. >>> >>> >>> *Current (Fetch) Latency* >>> >>> *currentFetchLatency* helps clearly tell whether the latency is caused >>> by Flink or not. Backpressure is not the only reason that we see fetch >>> latency. Even if there is no back pressure, the records may have passed a >>> long pipeline before they entered Flink. For example, say the >>> *currentLatency >>> *is 10 seconds and there is no backpressure. Does that mean the record >>> spent 10 seconds in the Source operator? If not, how much did Flink >>> contribute to that 10 seconds of latency? These questions are frequently >>> asked and hard to tell without the fetch latency. >>> >>> For "currentFetchLatency", we would need to understand timestamps before >>>> the records are decoded. That is only possible for some sources, where the >>>> client gives us the records in a (partially) decoded from already (like >>>> Kafka). Then, some work has been done between the fetch time and the time >>>> we update the metric already, so it is already a bit closer to the >>>> "currentFetchLatency". I think following this train of thought, there is >>>> diminished benefit from that specific metric. >>> >>> >>> We may not have to report the fetch latency before records are decoded. >>> One solution is to remember the* FetchTime* when the encoded records >>> are fetched, and report the fetch latency after the records are decoded by >>> computing (*FetchTime - EventTime*). An approximate implementation >>> would be adding a *FetchTime *field to the *RecordsWithSplitIds* assuming >>> that all the records in that data structure are fetched at the same time. >>> >>> Thoughts? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Wed, Sep 9, 2020 at 12:42 AM Stephan Ewen <step...@ververica.com> >>> wrote: >>> >>>> Thanks for reviving this, Becket! >>>> >>>> I think Konstantin's comments are great. I'd add these points: >>>> >>>> *Num Bytes / Records Metrics* >>>> >>>> For "numBytesIn" and "numRecordsIn", we should reuse the >>>> OperatorIOMetric group, then it also gets reported to the overview page in >>>> the Web UI. >>>> >>>> The "numBytesInPerSecond" and "numRecordsInPerSecond" are automatically >>>> derived metrics, no need to do anything once we populate the above two >>>> metrics >>>> >>>> >>>> *Current (Fetch) Latency* >>>> >>>> I would really go for "eventTimeLag" rather than "fetchLatency". I >>>> think "eventTimeLag" is a term that has some adoption in the Flink >>>> community and beyond. >>>> >>>> I am not so sure that I see the benefit between "currentLatency" and >>>> "currentFetchLatency", (or event time lag before/after) as this only is >>>> different by the time it takes to emit a batch. >>>> - In a non-backpressured case, these should be virtually identical >>>> (and both dominated by watermark lag, not the actual time it takes the >>>> fetch to be emitted) >>>> - In a backpressured case, why do you care about when data was >>>> fetched, as opposed to emitted? Emitted time is relevant for application >>>> semantics and checkpoints. Fetch time seems to be an implementation detail >>>> (how much does the source buffer). >>>> >>>> The "currentLatency" (eventTimeLagAfter) can be computed >>>> out-of-the-box, independent of a source implementation, so that is also a >>>> good argument to make it the main metric. >>>> We know timestamps and watermarks in the source. Except for cases where >>>> no watermarks have been defined at all (batch jobs or pure processing time >>>> jobs), in which case this metric should probably be "Infinite". >>>> >>>> For "currentFetchLatency", we would need to understand timestamps >>>> before the records are decoded. That is only possible for some sources, >>>> where the client gives us the records in a (partially) decoded from already >>>> (like Kafka). Then, some work has been done between the fetch time and the >>>> time we update the metric already, so it is already a bit closer to the >>>> "currentFetchLatency". I think following this train of thought, there is >>>> diminished benefit from that specific metric. >>>> >>>> >>>> *Idle Time* >>>> >>>> I agree, it would be great to rename this. Maybe to "sourceWaitTime" or >>>> "sourceIdleTime" so to make clear that this is not exactly the time that >>>> Flink's processing pipeline is idle, but the time where the source does not >>>> have new data. >>>> >>>> This is not an easy metric to collect, though (except maybe for the >>>> sources that are only idle while they have no split assigned, like >>>> continuous file source). >>>> >>>> *Source Specific Metrics* >>>> >>>> I believe source-specific would only be "sourceIdleTime", >>>> "numRecordsInErrors", "pendingBytes", and "pendingRecords". >>>> >>>> >>>> *Conclusion* >>>> >>>> We can probably add "numBytesIn" and "numRecordsIn" and "eventTimeLag" >>>> right away, with little complexity. >>>> I'd suggest to start with these right away. >>>> >>>> Best, >>>> Stephan >>>> >>>> >>>> On Tue, Sep 8, 2020 at 3:25 PM Becket Qin <becket....@gmail.com> wrote: >>>> >>>>> Hey Konstantin, >>>>> >>>>> Thanks for the feedback and suggestions. Please see the reply below. >>>>> >>>>> * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has >>>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>>>>> They >>>>>> have a similar name, but different definitions of idleness, >>>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>>>>> backpressured. Can we make it clearer that these two metrics mean >>>>>> different >>>>>> things? >>>>> >>>>> >>>>> That is a good point. I did not notice this metric earlier. It seems >>>>> that both metrics are useful to the users. One tells them how busy the >>>>> source is and how much more throughput the source can handle. The other >>>>> tells the users how long since the source has seen the last record, which >>>>> is useful for debugging. I'll update the FLIP to make it clear. >>>>> >>>>> * "current(Fetch)Latency" I am wondering if >>>>>> "eventTimeLag(Before|After)" >>>>>> is more descriptive/clear. What do others think? >>>>> >>>>> >>>>> I am quite open to the ideas on these names. In fact I also feel >>>>> "current(Fetch)Latency" are not super intuitive. So it would be great if >>>>> we >>>>> can have better names. >>>>> >>>>> * Current(Fetch)Latency implies that the timestamps are directly >>>>>> extracted in the source connector, right? Will this be the default for >>>>>> FLIP-27 sources anyway? >>>>> >>>>> >>>>> The "currentFetchLatency" will probably be reported by each source >>>>> implementation, because the data fetching is done by SplitReaders and >>>>> there >>>>> is no base implementation. The "currentLatency", on the other hand, can be >>>>> reported by the SourceReader base implementation. However, since >>>>> developers >>>>> can actually implement their own source connector without using our base >>>>> implementation, these metric guidance are still useful for the connector >>>>> developers in that case. >>>>> >>>>> * Does FLIP-33 also include the implementation of these metrics (to the >>>>>> extent possible) for all connectors currently available in Apache >>>>>> Flink or >>>>>> is the "per-connector implementation" out-of-scope? >>>>> >>>>> >>>>> FLIP-33 itself does not specify any implementation of those metrics. >>>>> But the connectors we provide in Apache Flink will follow the guidance of >>>>> FLIP-33 to emit those metrics when applicable. Maybe We can have some >>>>> public static Strings defined for the metric names to help other connector >>>>> developers follow the same guidance. I can also add that to the public >>>>> interface section of the FLIP if we decide to do that. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Tue, Sep 8, 2020 at 9:02 PM Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf <kna...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Becket, >>>>>>> >>>>>>> Thank you for picking up this FLIP. I have a few questions: >>>>>>> >>>>>>> * two thoughts on naming: >>>>>>> * idleTime: In the meantime, a similar metric >>>>>>> "idleTimeMsPerSecond" has >>>>>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>>>>>> They >>>>>>> have a similar name, but different definitions of idleness, >>>>>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>>>>>> backpressured. Can we make it clearer that these two metrics mean >>>>>>> different >>>>>>> things? >>>>>>> >>>>>>> * "current(Fetch)Latency" I am wondering if >>>>>>> "eventTimeLag(Before|After)" >>>>>>> is more descriptive/clear. What do others think? >>>>>>> >>>>>>> * Current(Fetch)Latency implies that the timestamps are directly >>>>>>> extracted in the source connector, right? Will this be the default >>>>>>> for >>>>>>> FLIP-27 sources anyway? >>>>>>> >>>>>>> * Does FLIP-33 also include the implementation of these metrics (to >>>>>>> the >>>>>>> extent possible) for all connectors currently available in Apache >>>>>>> Flink or >>>>>>> is the "per-connector implementation" out-of-scope? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Konstantin >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> > Hi all, >>>>>>> > >>>>>>> > To complete the Source refactoring work, I'd like to revive this >>>>>>> > discussion. Since the mail thread has been dormant for more than a >>>>>>> year, >>>>>>> > just to recap the motivation of the FLIP: >>>>>>> > >>>>>>> > 1. The FLIP proposes to standardize the connector metrics by giving >>>>>>> > guidance on the metric specifications, including the name, type >>>>>>> and meaning >>>>>>> > of the metrics. >>>>>>> > 2. It is OK for a connector to not emit some of the metrics in the >>>>>>> metric >>>>>>> > guidance, but if a metric of the same semantic is emitted, the >>>>>>> > implementation should follow the guidance. >>>>>>> > 3. It is OK for a connector to emit more metrics than what are >>>>>>> listed in >>>>>>> > the FLIP. This includes having an alias for a metric specified in >>>>>>> the FLIP. >>>>>>> > 4. We will implement some of the metrics out of the box in the >>>>>>> default >>>>>>> > implementation of FLIP-27, as long as it is applicable. >>>>>>> > >>>>>>> > The FLIP wiki is following: >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33 >>>>>>> > %3A+Standardize+Connector+Metrics >>>>>>> > >>>>>>> > Any thoughts? >>>>>>> > >>>>>>> > Thanks, >>>>>>> > >>>>>>> > Jiangjie (Becket) Qin >>>>>>> > >>>>>>> > >>>>>>> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski < >>>>>>> pi...@ververica.com> >>>>>>> > wrote: >>>>>>> > >>>>>>> > > > we will need to revisit the convention list and adjust them >>>>>>> accordingly >>>>>>> > > when FLIP-27 is ready >>>>>>> > > >>>>>>> > > >>>>>>> > > Yes, this sounds good :) >>>>>>> > > >>>>>>> > > Piotrek >>>>>>> > > >>>>>>> > > > On 13 Jun 2019, at 13:35, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> > > > >>>>>>> > > > Hi Piotr, >>>>>>> > > > >>>>>>> > > > That's great to know. Chances are that we will need to revisit >>>>>>> the >>>>>>> > > > convention list and adjust them accordingly when FLIP-27 is >>>>>>> ready, At >>>>>>> > > that >>>>>>> > > > point we can mark some of the metrics as available by default >>>>>>> for >>>>>>> > > > connectors implementing the new interface. >>>>>>> > > > >>>>>>> > > > Thanks, >>>>>>> > > > >>>>>>> > > > Jiangjie (Becket) Qin >>>>>>> > > > >>>>>>> > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski < >>>>>>> pi...@ververica.com> >>>>>>> > > wrote: >>>>>>> > > > >>>>>>> > > >> Thanks for driving this. I’ve just noticed one small thing. >>>>>>> With new >>>>>>> > > >> SourceReader interface Flink will be able to provide >>>>>>> `idleTime` metric >>>>>>> > > >> automatically. >>>>>>> > > >> >>>>>>> > > >> Piotrek >>>>>>> > > >> >>>>>>> > > >>> On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> > > >>> >>>>>>> > > >>> Thanks all for the feedback and discussion. >>>>>>> > > >>> >>>>>>> > > >>> Since there wasn't any concern raised, I've started the >>>>>>> voting thread >>>>>>> > > for >>>>>>> > > >>> this FLIP, but please feel free to continue the discussion >>>>>>> here if >>>>>>> > you >>>>>>> > > >>> think something still needs to be addressed. >>>>>>> > > >>> >>>>>>> > > >>> Thanks, >>>>>>> > > >>> >>>>>>> > > >>> Jiangjie (Becket) Qin >>>>>>> > > >>> >>>>>>> > > >>> >>>>>>> > > >>> >>>>>>> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin < >>>>>>> becket....@gmail.com> >>>>>>> > > wrote: >>>>>>> > > >>> >>>>>>> > > >>>> Hi Piotr, >>>>>>> > > >>>> >>>>>>> > > >>>> Thanks for the comments. Yes, you are right. Users will >>>>>>> have to look >>>>>>> > > at >>>>>>> > > >>>> other metrics to decide whether the pipeline is healthy or >>>>>>> not in >>>>>>> > the >>>>>>> > > >> first >>>>>>> > > >>>> place before they can use the time-based metric to fix the >>>>>>> > bottleneck. >>>>>>> > > >>>> >>>>>>> > > >>>> I agree that once we have FLIP-27 ready, some of the >>>>>>> metrics can >>>>>>> > just >>>>>>> > > be >>>>>>> > > >>>> reported by the abstract implementation. >>>>>>> > > >>>> >>>>>>> > > >>>> I've updated FLIP-33 wiki page to add the pendingBytes and >>>>>>> > > >> pendingRecords >>>>>>> > > >>>> metric. Please let me know if you have any concern over the >>>>>>> updated >>>>>>> > > >> metric >>>>>>> > > >>>> convention proposal. >>>>>>> > > >>>> >>>>>>> > > >>>> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen >>>>>>> > > >>>> <step...@ververica.com> will you also have time to take a >>>>>>> look at >>>>>>> > the >>>>>>> > > >>>> proposed metric convention? If there is no further concern >>>>>>> I'll >>>>>>> > start >>>>>>> > > a >>>>>>> > > >>>> voting thread for this FLIP in two days. >>>>>>> > > >>>> >>>>>>> > > >>>> Thanks, >>>>>>> > > >>>> >>>>>>> > > >>>> Jiangjie (Becket) Qin >>>>>>> > > >>>> >>>>>>> > > >>>> >>>>>>> > > >>>> >>>>>>> > > >>>> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski < >>>>>>> pi...@ververica.com> >>>>>>> > > >> wrote: >>>>>>> > > >>>> >>>>>>> > > >>>>> Hi Becket, >>>>>>> > > >>>>> >>>>>>> > > >>>>> Thanks for the answer :) >>>>>>> > > >>>>> >>>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>>>>> > producing >>>>>>> > > >> the >>>>>>> > > >>>>>> record to downstream. For example, a source connector can >>>>>>> report >>>>>>> > > that >>>>>>> > > >>>>> it's >>>>>>> > > >>>>>> spending 80% of time to emit record to downstream >>>>>>> processing >>>>>>> > > pipeline. >>>>>>> > > >>>>> In >>>>>>> > > >>>>>> another case, a sink connector may report that its >>>>>>> spending 30% of >>>>>>> > > >> time >>>>>>> > > >>>>>> producing the records to the external system. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage >>>>>>> metric: >>>>>>> > > >>>>> >>>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>>>>> > > downstream >>>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>>>>> > > downstream >>>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>>>>> full. >>>>>>> > > >>>>> >>>>>>> > > >>>>> If by “time spent on emitting records to downstream” you >>>>>>> understand >>>>>>> > > >>>>> “waiting on back pressure”, then I see your point. And I >>>>>>> agree that >>>>>>> > > >> some >>>>>>> > > >>>>> kind of ratio/time based metric gives you more >>>>>>> information. However >>>>>>> > > >> under >>>>>>> > > >>>>> “time spent on emitting records to downstream” might be >>>>>>> hidden the >>>>>>> > > >>>>> following (extreme) situation: >>>>>>> > > >>>>> >>>>>>> > > >>>>> 1. Job is barely able to handle influx of records, there >>>>>>> is 99% >>>>>>> > > >>>>> CPU/resource usage in the cluster, but nobody is >>>>>>> > > >>>>> bottlenecked/backpressured, all output buffers are empty, >>>>>>> everybody >>>>>>> > > is >>>>>>> > > >>>>> waiting in 1% of it’s time for more records to process. >>>>>>> > > >>>>> 2. 80% time can still be spent on "down stream operators”, >>>>>>> because >>>>>>> > > they >>>>>>> > > >>>>> are the CPU intensive operations, but this doesn’t mean >>>>>>> that >>>>>>> > > >> increasing the >>>>>>> > > >>>>> parallelism down the stream will help with anything there. >>>>>>> To the >>>>>>> > > >> contrary, >>>>>>> > > >>>>> increasing parallelism of the source operator might help to >>>>>>> > increase >>>>>>> > > >>>>> resource utilisation up to 100%. >>>>>>> > > >>>>> >>>>>>> > > >>>>> However, this “time based/ratio” approach can be extended >>>>>>> to >>>>>>> > > in/output >>>>>>> > > >>>>> buffer usage. Besides collecting an information that >>>>>>> input/output >>>>>>> > > >> buffer is >>>>>>> > > >>>>> full/empty, we can probe profile how often are buffer >>>>>>> empty/full. >>>>>>> > If >>>>>>> > > >> output >>>>>>> > > >>>>> buffer is full 1% of times, there is almost no back >>>>>>> pressure. If >>>>>>> > it’s >>>>>>> > > >> full >>>>>>> > > >>>>> 80% of times, there is some back pressure, if it’s full >>>>>>> 99.9% of >>>>>>> > > times, >>>>>>> > > >>>>> there is huge back pressure. >>>>>>> > > >>>>> >>>>>>> > > >>>>> Now for autoscaling you could compare the input & output >>>>>>> buffers >>>>>>> > fill >>>>>>> > > >>>>> ratio: >>>>>>> > > >>>>> >>>>>>> > > >>>>> 1. Both are high, the source of bottleneck is down the >>>>>>> stream >>>>>>> > > >>>>> 2. Output is low, input is high, this is the bottleneck >>>>>>> and the >>>>>>> > > higher >>>>>>> > > >>>>> the difference, the bigger source of bottleneck is this is >>>>>>> > > >> operator/task >>>>>>> > > >>>>> 3. Output is high, input is low - there was some load >>>>>>> spike that we >>>>>>> > > are >>>>>>> > > >>>>> currently finishing to process >>>>>>> > > >>>>> >>>>>>> > > >>>>> >>>>>>> > > >>>>> >>>>>>> > > >>>>> But long story short, we are probably diverging from the >>>>>>> topic of >>>>>>> > > this >>>>>>> > > >>>>> discussion, and we can discuss this at some later point. >>>>>>> > > >>>>> >>>>>>> > > >>>>> For now, for sources: >>>>>>> > > >>>>> >>>>>>> > > >>>>> as I wrote before, +1 for: >>>>>>> > > >>>>> - pending.bytes, Gauge >>>>>>> > > >>>>> - pending.messages, Gauge >>>>>>> > > >>>>> >>>>>>> > > >>>>> When we will be developing/discussing SourceReader from >>>>>>> FLIP-27 we >>>>>>> > > >> might >>>>>>> > > >>>>> then add: >>>>>>> > > >>>>> >>>>>>> > > >>>>> - in-memory.buffer.usage (0 - 100%) >>>>>>> > > >>>>> >>>>>>> > > >>>>> Which will be estimated automatically by Flink while user >>>>>>> will be >>>>>>> > > able >>>>>>> > > >> to >>>>>>> > > >>>>> override/provide better estimation. >>>>>>> > > >>>>> >>>>>>> > > >>>>> Piotrek >>>>>>> > > >>>>> >>>>>>> > > >>>>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Hi Piotr, >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Thanks for the explanation. Please see some >>>>>>> clarifications below. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>>>>> > producing >>>>>>> > > >> the >>>>>>> > > >>>>>> record to downstream. For example, a source connector can >>>>>>> report >>>>>>> > > that >>>>>>> > > >>>>> it's >>>>>>> > > >>>>>> spending 80% of time to emit record to downstream >>>>>>> processing >>>>>>> > > pipeline. >>>>>>> > > >>>>> In >>>>>>> > > >>>>>> another case, a sink connector may report that its >>>>>>> spending 30% of >>>>>>> > > >> time >>>>>>> > > >>>>>> producing the records to the external system. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> This is in some sense equivalent to the buffer usage >>>>>>> metric: >>>>>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>>>>> > > downstream >>>>>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>>>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>>>>> > > downstream >>>>>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>>>>> full. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> However, the time-based metric has a few advantages that >>>>>>> the >>>>>>> > buffer >>>>>>> > > >>>>> usage >>>>>>> > > >>>>>> metric may not have. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> 1. Buffer usage metric may not be applicable to all the >>>>>>> connector >>>>>>> > > >>>>>> implementations, while reporting time-based metric are >>>>>>> always >>>>>>> > > doable. >>>>>>> > > >>>>>> Some source connectors may not have any input buffer, or >>>>>>> they may >>>>>>> > > use >>>>>>> > > >>>>> some >>>>>>> > > >>>>>> third party library that does not expose the input buffer >>>>>>> at all. >>>>>>> > > >>>>>> Similarly, for sink connectors, the implementation may >>>>>>> not have >>>>>>> > any >>>>>>> > > >>>>> output >>>>>>> > > >>>>>> buffer, or the third party library does not expose such >>>>>>> buffer. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> 2. Although both type of metrics can detect bottleneck, >>>>>>> time-based >>>>>>> > > >>>>> metrics >>>>>>> > > >>>>>> can be used to generate a more informed action to remove >>>>>>> the >>>>>>> > > >> bottleneck. >>>>>>> > > >>>>>> For example, when the downstream is bottleneck, the >>>>>>> output buffer >>>>>>> > > >> usage >>>>>>> > > >>>>>> metric is likely to be 100%, and the input buffer usage >>>>>>> might be >>>>>>> > 0%. >>>>>>> > > >>>>> That >>>>>>> > > >>>>>> means we don't know what is the suitable parallelism to >>>>>>> lift the >>>>>>> > > >>>>>> bottleneck. The time-based metric, on the other hand, >>>>>>> would give >>>>>>> > > >> useful >>>>>>> > > >>>>>> information, e.g. if 80% of time was spent on emitting >>>>>>> records, we >>>>>>> > > can >>>>>>> > > >>>>>> roughly increase the downstream node parallelism by 4 >>>>>>> times. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Admittedly, the time-based metrics are more expensive >>>>>>> than buffer >>>>>>> > > >>>>> usage. So >>>>>>> > > >>>>>> we may have to do some sampling to reduce the cost. But in >>>>>>> > general, >>>>>>> > > >>>>> using >>>>>>> > > >>>>>> time-based metrics seems worth adding. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> That being said, I don't think buffer usage metric and >>>>>>> time-based >>>>>>> > > >>>>> metrics >>>>>>> > > >>>>>> are mutually exclusive. We can probably have both. It is >>>>>>> just that >>>>>>> > > in >>>>>>> > > >>>>>> practice, features like auto-scaling might prefer >>>>>>> time-based >>>>>>> > metrics >>>>>>> > > >> for >>>>>>> > > >>>>>> the reason stated above. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually >>>>>>> detect >>>>>>> > > >>>>> bottlenecks. >>>>>>> > > >>>>>> As I wrote, we already have them in most of the places, >>>>>>> except of >>>>>>> > > >>>>>> sources/sinks. >>>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect >>>>>>> bottlenecks. >>>>>>> > > Currently >>>>>>> > > >> we >>>>>>> > > >>>>>> are only automatically detecting back pressure and >>>>>>> reporting it to >>>>>>> > > the >>>>>>> > > >>>>> user >>>>>>> > > >>>>>> in web UI (is it exposed as a metric at all?). Detecting >>>>>>> the root >>>>>>> > > >> cause >>>>>>> > > >>>>> of >>>>>>> > > >>>>>> the back pressure (bottleneck) is one step further. >>>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck >>>>>>> is >>>>>>> > located, >>>>>>> > > >> to >>>>>>> > > >>>>>> try to do something with it. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> As explained above, I think time-based metric also >>>>>>> addresses item >>>>>>> > 1 >>>>>>> > > >> and >>>>>>> > > >>>>>> item 2. >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Any thoughts? >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Thanks, >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> >>>>>>> > > >>>>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski < >>>>>>> > pi...@ververica.com> >>>>>>> > > >>>>> wrote: >>>>>>> > > >>>>>> >>>>>>> > > >>>>>>> Hi again :) >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>>> - pending.bytes, Gauge >>>>>>> > > >>>>>>>> - pending.messages, Gauge >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> +1 >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> And true, instead of overloading one of the metric it is >>>>>>> better >>>>>>> > > when >>>>>>> > > >>>>> user >>>>>>> > > >>>>>>> can choose to provide only one of them. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> Re 2: >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>>>>> pending >>>>>>> > > >> mesages >>>>>>> > > >>>>> / >>>>>>> > > >>>>>>>> bytes would answer the questions of: >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging >>>>>>> behind + empty >>>>>>> > > >>>>> buffer >>>>>>> > > >>>>>>> = >>>>>>> > > >>>>>>>> cannot consume fast enough. >>>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>>>>> full >>>>>>> > > buffer >>>>>>> > > >> = >>>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is >>>>>>> slow. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> Yes, exactly. This can also be used to support decisions >>>>>>> like >>>>>>> > > >> changing >>>>>>> > > >>>>> the >>>>>>> > > >>>>>>> parallelism of the sources and/or down stream operators. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> I’m not sure if I understand your proposal with time >>>>>>> based >>>>>>> > > >>>>> measurements. >>>>>>> > > >>>>>>> Maybe I’m missing something, but I do not see how >>>>>>> measuring time >>>>>>> > > >> alone >>>>>>> > > >>>>>>> could answer the problem: where is the bottleneck. Time >>>>>>> spent on >>>>>>> > > the >>>>>>> > > >>>>>>> next/emit might be short or long (depending on how heavy >>>>>>> to >>>>>>> > process >>>>>>> > > >> the >>>>>>> > > >>>>>>> record is) and the source can still be bottlenecked/back >>>>>>> > pressured >>>>>>> > > or >>>>>>> > > >>>>> not. >>>>>>> > > >>>>>>> Usually the easiest and the most reliable way how to >>>>>>> detect >>>>>>> > > >>>>> bottlenecks is >>>>>>> > > >>>>>>> by checking usage of input & output buffers, since when >>>>>>> input >>>>>>> > > buffer >>>>>>> > > >> is >>>>>>> > > >>>>>>> full while output buffer is empty, that’s the definition >>>>>>> of a >>>>>>> > > >>>>> bottleneck. >>>>>>> > > >>>>>>> Also this is usually very easy and cheap to measure (it >>>>>>> works >>>>>>> > > >>>>> effectively >>>>>>> > > >>>>>>> the same way as current’s Flink back pressure >>>>>>> monitoring, but >>>>>>> > more >>>>>>> > > >>>>> cleanly, >>>>>>> > > >>>>>>> without probing thread’s stack traces). >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> Also keep in mind that we are already using the buffer >>>>>>> usage >>>>>>> > > metrics >>>>>>> > > >>>>> for >>>>>>> > > >>>>>>> detecting the bottlenecks in Flink’s internal network >>>>>>> exchanges >>>>>>> > > >> (manual >>>>>>> > > >>>>>>> work). That’s the reason why I wanted to extend this to >>>>>>> > > >> sources/sinks, >>>>>>> > > >>>>>>> since they are currently our blind spot. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>>>>> > > automatically >>>>>>> > > >>>>>>> relies >>>>>>> > > >>>>>>>> on some metrics answering the same question >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> That would be very helpful feature. I think in order to >>>>>>> achieve >>>>>>> > > that >>>>>>> > > >> we >>>>>>> > > >>>>>>> would need to: >>>>>>> > > >>>>>>> 1. Define the metrics that would allow us to manually >>>>>>> detect >>>>>>> > > >>>>> bottlenecks. >>>>>>> > > >>>>>>> As I wrote, we already have them in most of the places, >>>>>>> except of >>>>>>> > > >>>>>>> sources/sinks. >>>>>>> > > >>>>>>> 2. Use those metrics, to automatically detect >>>>>>> bottlenecks. >>>>>>> > > Currently >>>>>>> > > >> we >>>>>>> > > >>>>>>> are only automatically detecting back pressure and >>>>>>> reporting it >>>>>>> > to >>>>>>> > > >> the >>>>>>> > > >>>>> user >>>>>>> > > >>>>>>> in web UI (is it exposed as a metric at all?). Detecting >>>>>>> the root >>>>>>> > > >>>>> cause of >>>>>>> > > >>>>>>> the back pressure (bottleneck) is one step further. >>>>>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck >>>>>>> is >>>>>>> > located, >>>>>>> > > >> to >>>>>>> > > >>>>> try >>>>>>> > > >>>>>>> to do something with it. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> I think you are aiming for point 3., but before we reach >>>>>>> it, we >>>>>>> > are >>>>>>> > > >>>>> still >>>>>>> > > >>>>>>> missing 1. & 2. Also even if we have 3., there is a >>>>>>> value in 1 & >>>>>>> > 2 >>>>>>> > > >> for >>>>>>> > > >>>>>>> manual analysis/dashboards. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> However, having the knowledge of where the bottleneck >>>>>>> is, doesn’t >>>>>>> > > >>>>>>> necessarily mean that we know what we can do about it. >>>>>>> For >>>>>>> > example >>>>>>> > > >>>>>>> increasing parallelism might or might not help with >>>>>>> anything >>>>>>> > (data >>>>>>> > > >>>>> skew, >>>>>>> > > >>>>>>> bottleneck on some resource that does not scale), but >>>>>>> this remark >>>>>>> > > >>>>> applies >>>>>>> > > >>>>>>> always, regardless of the way how did we detect the >>>>>>> bottleneck. >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> Piotrek >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>>> On 3 Jun 2019, at 06:16, Becket Qin < >>>>>>> becket....@gmail.com> >>>>>>> > wrote: >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Hi Piotr, >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Thanks for the suggestion. Some thoughts below: >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Re 1: The pending messages / bytes. >>>>>>> > > >>>>>>>> I completely agree these are very useful metrics and we >>>>>>> should >>>>>>> > > >> expect >>>>>>> > > >>>>> the >>>>>>> > > >>>>>>>> connector to report. WRT the way to expose them, it >>>>>>> seems more >>>>>>> > > >>>>> consistent >>>>>>> > > >>>>>>>> to add two metrics instead of adding a method (unless >>>>>>> there are >>>>>>> > > >> other >>>>>>> > > >>>>> use >>>>>>> > > >>>>>>>> cases other than metric reporting). So we can add the >>>>>>> following >>>>>>> > > two >>>>>>> > > >>>>>>> metrics. >>>>>>> > > >>>>>>>> - pending.bytes, Gauge >>>>>>> > > >>>>>>>> - pending.messages, Gauge >>>>>>> > > >>>>>>>> Applicable connectors can choose to report them. These >>>>>>> two >>>>>>> > metrics >>>>>>> > > >>>>> along >>>>>>> > > >>>>>>>> with latency should be sufficient for users to >>>>>>> understand the >>>>>>> > > >> progress >>>>>>> > > >>>>>>> of a >>>>>>> > > >>>>>>>> connector. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Re 2: Number of buffered data in-memory of the connector >>>>>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>>>>> pending >>>>>>> > > >> mesages >>>>>>> > > >>>>> / >>>>>>> > > >>>>>>>> bytes would answer the questions of: >>>>>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging >>>>>>> behind + empty >>>>>>> > > >>>>> buffer >>>>>>> > > >>>>>>> = >>>>>>> > > >>>>>>>> cannot consume fast enough. >>>>>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>>>>> full >>>>>>> > > buffer >>>>>>> > > >> = >>>>>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is >>>>>>> slow. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>>>>> > > automatically >>>>>>> > > >>>>>>> relies >>>>>>> > > >>>>>>>> on some metrics answering the same question, more >>>>>>> specifically, >>>>>>> > we >>>>>>> > > >> are >>>>>>> > > >>>>>>>> profiling the time spent on .next() method (time to >>>>>>> consume) and >>>>>>> > > the >>>>>>> > > >>>>> time >>>>>>> > > >>>>>>>> spent on .collect() method (time to emit / process). One >>>>>>> > advantage >>>>>>> > > >> of >>>>>>> > > >>>>>>> such >>>>>>> > > >>>>>>>> method level time cost allows us to calculate the >>>>>>> parallelism >>>>>>> > > >>>>> required to >>>>>>> > > >>>>>>>> keep up in case their is a lag. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> However, one concern I have regarding such metric is >>>>>>> that they >>>>>>> > are >>>>>>> > > >>>>>>>> implementation specific. Either profiling on the method >>>>>>> time, or >>>>>>> > > >>>>>>> reporting >>>>>>> > > >>>>>>>> buffer usage assumes the connector are implemented in >>>>>>> such a >>>>>>> > way. >>>>>>> > > A >>>>>>> > > >>>>>>>> slightly better solution might be have the following >>>>>>> metric: >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> - EmitTimeRatio (or FetchTimeRatio): The time spent on >>>>>>> emitting >>>>>>> > > >>>>>>>> records / Total time elapsed. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> This assumes that the source connectors have to emit >>>>>>> the records >>>>>>> > > to >>>>>>> > > >>>>> the >>>>>>> > > >>>>>>>> downstream at some point. The emission may take some >>>>>>> time ( e.g. >>>>>>> > > go >>>>>>> > > >>>>>>> through >>>>>>> > > >>>>>>>> chained operators). And the rest of the time are spent >>>>>>> to >>>>>>> > prepare >>>>>>> > > >> the >>>>>>> > > >>>>>>>> record to emit, including time for consuming and format >>>>>>> > > conversion, >>>>>>> > > >>>>> etc. >>>>>>> > > >>>>>>>> Ideally, we'd like to see the time spent on record >>>>>>> fetch and >>>>>>> > emit >>>>>>> > > to >>>>>>> > > >>>>> be >>>>>>> > > >>>>>>>> about the same, so no one is bottleneck for the other. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> The downside of these time based metrics is additional >>>>>>> overhead >>>>>>> > to >>>>>>> > > >> get >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>> time, therefore sampling might be needed. But in >>>>>>> practice I feel >>>>>>> > > >> such >>>>>>> > > >>>>>>> time >>>>>>> > > >>>>>>>> based metric might be more useful if we want to take >>>>>>> action. >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> I think we should absolutely add metrics in (1) to the >>>>>>> metric >>>>>>> > > >>>>> convention. >>>>>>> > > >>>>>>>> We could also add the metrics mentioned in (2) if we >>>>>>> reach >>>>>>> > > consensus >>>>>>> > > >>>>> on >>>>>>> > > >>>>>>>> that. What do you think? >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Thanks, >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski < >>>>>>> > > pi...@ververica.com >>>>>>> > > >>> >>>>>>> > > >>>>>>> wrote: >>>>>>> > > >>>>>>>> >>>>>>> > > >>>>>>>>> Hey Becket, >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Re 1a) and 1b) +1 from my side. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> I’ve discussed this issue: >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us >>>>>>> to check >>>>>>> > > the >>>>>>> > > >>>>> cause >>>>>>> > > >>>>>>>>> of >>>>>>> > > >>>>>>>>>>>> back pressure: >>>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>>>>> boolean >>>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>>>>> boolean >>>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> With Nico at some lengths and he also saw the benefits >>>>>>> of them. >>>>>>> > > We >>>>>>> > > >>>>> also >>>>>>> > > >>>>>>>>> have more concrete proposal for that. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Actually there are two really useful metrics, that we >>>>>>> are >>>>>>> > missing >>>>>>> > > >>>>>>>>> currently: >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> 1. Number of data/records/bytes in the backlog to >>>>>>> process. For >>>>>>> > > >>>>> example >>>>>>> > > >>>>>>>>> remaining number of bytes in unread files. Or pending >>>>>>> data in >>>>>>> > > Kafka >>>>>>> > > >>>>>>> topics. >>>>>>> > > >>>>>>>>> 2. Number of buffered data in-memory of the connector, >>>>>>> that are >>>>>>> > > >>>>> waiting >>>>>>> > > >>>>>>> to >>>>>>> > > >>>>>>>>> be processed pushed to Flink pipeline. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Re 1: >>>>>>> > > >>>>>>>>> This would have to be a metric provided directly by a >>>>>>> > connector. >>>>>>> > > It >>>>>>> > > >>>>>>> could >>>>>>> > > >>>>>>>>> be an undefined `int`: >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> `int backlog` - estimate of pending work. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> “Undefined” meaning that it would be up to a connector >>>>>>> to >>>>>>> > decided >>>>>>> > > >>>>>>> whether >>>>>>> > > >>>>>>>>> it’s measured in bytes, records, pending files or >>>>>>> whatever it >>>>>>> > is >>>>>>> > > >>>>>>> possible >>>>>>> > > >>>>>>>>> to provide by the connector. This is because I assume >>>>>>> not every >>>>>>> > > >>>>>>> connector >>>>>>> > > >>>>>>>>> can provide exact number and for some of them it might >>>>>>> be >>>>>>> > > >> impossible >>>>>>> > > >>>>> to >>>>>>> > > >>>>>>>>> provide records number of bytes count. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Re 2: >>>>>>> > > >>>>>>>>> This metric could be either provided by a connector, or >>>>>>> > > calculated >>>>>>> > > >>>>>>> crudely >>>>>>> > > >>>>>>>>> by Flink: >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> `float bufferUsage` - value from [0.0, 1.0] range >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Percentage of used in memory buffers, like in Kafka’s >>>>>>> handover. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> It could be crudely implemented by Flink with FLIP-27 >>>>>>> > > >>>>>>>>> SourceReader#isAvailable. If SourceReader is not >>>>>>> available >>>>>>> > > reported >>>>>>> > > >>>>>>>>> `bufferUsage` could be 0.0. If it is available, it >>>>>>> could be >>>>>>> > 1.0. >>>>>>> > > I >>>>>>> > > >>>>> think >>>>>>> > > >>>>>>>>> this would be a good enough estimation for most of the >>>>>>> use >>>>>>> > cases >>>>>>> > > >>>>> (that >>>>>>> > > >>>>>>>>> could be overloaded and implemented better if desired). >>>>>>> > > Especially >>>>>>> > > >>>>>>> since we >>>>>>> > > >>>>>>>>> are reporting only probed values: if probed values are >>>>>>> almost >>>>>>> > > >> always >>>>>>> > > >>>>>>> “1.0”, >>>>>>> > > >>>>>>>>> it would mean that we have a back pressure. If they >>>>>>> are almost >>>>>>> > > >> always >>>>>>> > > >>>>>>>>> “0.0”, there is probably no back pressure at the >>>>>>> sources. >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> What do you think about this? >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> Piotrek >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>>> On 30 May 2019, at 11:41, Becket Qin < >>>>>>> becket....@gmail.com> >>>>>>> > > >> wrote: >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> Hi all, >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> Thanks a lot for all the feedback and comments. I'd >>>>>>> like to >>>>>>> > > >> continue >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>> discussion on this FLIP. >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> I updated the FLIP-33 wiki to remove all the >>>>>>> histogram metrics >>>>>>> > > >> from >>>>>>> > > >>>>> the >>>>>>> > > >>>>>>>>>> first version of metric convention due to the >>>>>>> performance >>>>>>> > > concern. >>>>>>> > > >>>>> The >>>>>>> > > >>>>>>>>> plan >>>>>>> > > >>>>>>>>>> is to introduce them later when we have a mechanism >>>>>>> to opt >>>>>>> > > in/out >>>>>>> > > >>>>>>>>> metrics. >>>>>>> > > >>>>>>>>>> At that point, users can decide whether they want to >>>>>>> pay the >>>>>>> > > cost >>>>>>> > > >> to >>>>>>> > > >>>>>>> get >>>>>>> > > >>>>>>>>>> the metric or not. >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> As Stephan suggested, for this FLIP, let's first try >>>>>>> to agree >>>>>>> > on >>>>>>> > > >> the >>>>>>> > > >>>>>>>>> small >>>>>>> > > >>>>>>>>>> list of conventional metrics that connectors should >>>>>>> follow. >>>>>>> > > >>>>>>>>>> Just to be clear, the purpose of the convention is >>>>>>> not to >>>>>>> > > enforce >>>>>>> > > >>>>> every >>>>>>> > > >>>>>>>>>> connector to report all these metrics, but to provide >>>>>>> a >>>>>>> > guidance >>>>>>> > > >> in >>>>>>> > > >>>>>>> case >>>>>>> > > >>>>>>>>>> these metrics are reported by some connectors. >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> @ Stephan & Chesnay, >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> Regarding the duplication of `RecordsIn` metric in >>>>>>> operator / >>>>>>> > > task >>>>>>> > > >>>>>>>>>> IOMetricGroups, from what I understand, for source >>>>>>> operator, >>>>>>> > it >>>>>>> > > is >>>>>>> > > >>>>>>>>> actually >>>>>>> > > >>>>>>>>>> the SourceFunction that reports the operator level >>>>>>> > > >>>>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are >>>>>>> essentially >>>>>>> > the >>>>>>> > > >>>>> same >>>>>>> > > >>>>>>>>>> metric in the operator level IOMetricGroup. Similarly >>>>>>> for the >>>>>>> > > Sink >>>>>>> > > >>>>>>>>>> operator, the operator level >>>>>>> RecordsOut/RecordsOutPerSecond >>>>>>> > > >> metrics >>>>>>> > > >>>>> are >>>>>>> > > >>>>>>>>>> also reported by the Sink function. I marked them as >>>>>>> existing >>>>>>> > in >>>>>>> > > >> the >>>>>>> > > >>>>>>>>>> FLIP-33 wiki page. Please let me know if I >>>>>>> misunderstood. >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin < >>>>>>> > > becket....@gmail.com> >>>>>>> > > >>>>>>> wrote: >>>>>>> > > >>>>>>>>>> >>>>>>> > > >>>>>>>>>>> Hi Piotr, >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> Thanks a lot for the feedback. >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> 1a) I guess you are referring to the part that >>>>>>> "original >>>>>>> > system >>>>>>> > > >>>>>>> specific >>>>>>> > > >>>>>>>>>>> metrics should also be reported". The performance >>>>>>> impact >>>>>>> > > depends >>>>>>> > > >> on >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>> implementation. An efficient implementation would >>>>>>> only record >>>>>>> > > the >>>>>>> > > >>>>>>> metric >>>>>>> > > >>>>>>>>>>> once, but report them with two different metric >>>>>>> names. This >>>>>>> > is >>>>>>> > > >>>>>>> unlikely >>>>>>> > > >>>>>>>>> to >>>>>>> > > >>>>>>>>>>> hurt performance. >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> 1b) Yes, I agree that we should avoid adding >>>>>>> overhead to the >>>>>>> > > >>>>> critical >>>>>>> > > >>>>>>>>> path >>>>>>> > > >>>>>>>>>>> by all means. This is sometimes a tradeoff, running >>>>>>> blindly >>>>>>> > > >> without >>>>>>> > > >>>>>>> any >>>>>>> > > >>>>>>>>>>> metric gives best performance, but sometimes might be >>>>>>> > > frustrating >>>>>>> > > >>>>> when >>>>>>> > > >>>>>>>>> we >>>>>>> > > >>>>>>>>>>> debug some issues. >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> 2. The metrics are indeed very useful. Are they >>>>>>> supposed to >>>>>>> > be >>>>>>> > > >>>>>>> reported >>>>>>> > > >>>>>>>>> by >>>>>>> > > >>>>>>>>>>> the connectors or Flink itself? At this point >>>>>>> FLIP-33 is more >>>>>>> > > >>>>> focused >>>>>>> > > >>>>>>> on >>>>>>> > > >>>>>>>>>>> provide a guidance to the connector authors on the >>>>>>> metrics >>>>>>> > > >>>>> reporting. >>>>>>> > > >>>>>>>>> That >>>>>>> > > >>>>>>>>>>> said, after FLIP-27, I think we should absolutely >>>>>>> report >>>>>>> > these >>>>>>> > > >>>>> metrics >>>>>>> > > >>>>>>>>> in >>>>>>> > > >>>>>>>>>>> the abstract implementation. In any case, the metric >>>>>>> > convention >>>>>>> > > >> in >>>>>>> > > >>>>>>> this >>>>>>> > > >>>>>>>>>>> list are expected to evolve over time. >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski < >>>>>>> > > >>>>> pi...@ververica.com> >>>>>>> > > >>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> Hi, >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> Thanks for the proposal and driving the effort here >>>>>>> Becket >>>>>>> > :) >>>>>>> > > >> I’ve >>>>>>> > > >>>>>>> read >>>>>>> > > >>>>>>>>>>>> through the FLIP-33 [1], and here are couple of my >>>>>>> thoughts. >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> Big +1 for standardising the metric names between >>>>>>> > connectors, >>>>>>> > > it >>>>>>> > > >>>>> will >>>>>>> > > >>>>>>>>>>>> definitely help us and users a lot. >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> Issues/questions/things to discuss that I’ve >>>>>>> thought of: >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> 1a. If we are about to duplicate some metrics, can >>>>>>> this >>>>>>> > > become a >>>>>>> > > >>>>>>>>>>>> performance issue? >>>>>>> > > >>>>>>>>>>>> 1b. Generally speaking, we should make sure that >>>>>>> collecting >>>>>>> > > >> those >>>>>>> > > >>>>>>>>> metrics >>>>>>> > > >>>>>>>>>>>> is as non intrusive as possible, especially that >>>>>>> they will >>>>>>> > > need >>>>>>> > > >>>>> to be >>>>>>> > > >>>>>>>>>>>> updated once per record. (They might be collected >>>>>>> more >>>>>>> > rarely >>>>>>> > > >> with >>>>>>> > > >>>>>>> some >>>>>>> > > >>>>>>>>>>>> overhead, but the hot path of updating it per >>>>>>> record will >>>>>>> > need >>>>>>> > > >> to >>>>>>> > > >>>>> be >>>>>>> > > >>>>>>> as >>>>>>> > > >>>>>>>>>>>> quick as possible). That includes both avoiding >>>>>>> heavy >>>>>>> > > >> computation >>>>>>> > > >>>>> on >>>>>>> > > >>>>>>>>> per >>>>>>> > > >>>>>>>>>>>> record path: histograms?, measuring time for time >>>>>>> based >>>>>>> > > metrics >>>>>>> > > >>>>> (per >>>>>>> > > >>>>>>>>>>>> second) (System.currentTimeMillis() depending on the >>>>>>> > > >>>>> implementation >>>>>>> > > >>>>>>> can >>>>>>> > > >>>>>>>>>>>> invoke a system call) >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us >>>>>>> to check >>>>>>> > > the >>>>>>> > > >>>>> cause >>>>>>> > > >>>>>>>>> of >>>>>>> > > >>>>>>>>>>>> back pressure: >>>>>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>>>>> boolean >>>>>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>>>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>>>>> boolean >>>>>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> a) is useful in a scenario when we are processing >>>>>>> backlog of >>>>>>> > > >>>>> records, >>>>>>> > > >>>>>>>>> all >>>>>>> > > >>>>>>>>>>>> of the internal Flink’s input/output network >>>>>>> buffers are >>>>>>> > > empty, >>>>>>> > > >>>>> and >>>>>>> > > >>>>>>> we >>>>>>> > > >>>>>>>>> want >>>>>>> > > >>>>>>>>>>>> to check whether the external source system is the >>>>>>> > bottleneck >>>>>>> > > >>>>>>> (source’s >>>>>>> > > >>>>>>>>>>>> input queue will be empty), or if the Flink’s >>>>>>> connector is >>>>>>> > the >>>>>>> > > >>>>>>>>> bottleneck >>>>>>> > > >>>>>>>>>>>> (source’s input queues will be full). >>>>>>> > > >>>>>>>>>>>> b) similar story. Backlog of records, but this time >>>>>>> all of >>>>>>> > the >>>>>>> > > >>>>>>> internal >>>>>>> > > >>>>>>>>>>>> Flink’s input/ouput network buffers are full, and >>>>>>> we want o >>>>>>> > > >> check >>>>>>> > > >>>>>>>>> whether >>>>>>> > > >>>>>>>>>>>> the external sink system is the bottleneck (sink >>>>>>> output >>>>>>> > queues >>>>>>> > > >> are >>>>>>> > > >>>>>>>>> full), >>>>>>> > > >>>>>>>>>>>> or if the Flink’s connector is the bottleneck >>>>>>> (sink’s output >>>>>>> > > >>>>> queues >>>>>>> > > >>>>>>>>> will be >>>>>>> > > >>>>>>>>>>>> empty) >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> It might be sometimes difficult to provide those >>>>>>> metrics, so >>>>>>> > > >> they >>>>>>> > > >>>>>>> could >>>>>>> > > >>>>>>>>>>>> be optional, but if we could provide them, it would >>>>>>> be >>>>>>> > really >>>>>>> > > >>>>>>> helpful. >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> Piotrek >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> [1] >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>> >>>>>>> > > >> >>>>>>> > > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>>> > > >>>>>>>>>>>> < >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>> >>>>>>> > > >> >>>>>>> > > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen < >>>>>>> se...@apache.org> >>>>>>> > > >> wrote: >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> I think this sounds reasonable. >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> Let's keep the "reconfiguration without stopping >>>>>>> the job" >>>>>>> > out >>>>>>> > > >> of >>>>>>> > > >>>>>>> this, >>>>>>> > > >>>>>>>>>>>>> because that would be a super big effort and if we >>>>>>> approach >>>>>>> > > >> that, >>>>>>> > > >>>>>>> then >>>>>>> > > >>>>>>>>>>>> in >>>>>>> > > >>>>>>>>>>>>> more generic way rather than specific to connector >>>>>>> metrics. >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> I would suggest to look at the following things >>>>>>> before >>>>>>> > > starting >>>>>>> > > >>>>> with >>>>>>> > > >>>>>>>>> any >>>>>>> > > >>>>>>>>>>>>> implementation work: >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> - Try and find a committer to support this, >>>>>>> otherwise it >>>>>>> > will >>>>>>> > > >> be >>>>>>> > > >>>>>>> hard >>>>>>> > > >>>>>>>>>>>> to >>>>>>> > > >>>>>>>>>>>>> make progress >>>>>>> > > >>>>>>>>>>>>> - Start with defining a smaller set of "core >>>>>>> metrics" and >>>>>>> > > >> extend >>>>>>> > > >>>>> the >>>>>>> > > >>>>>>>>>>>> set >>>>>>> > > >>>>>>>>>>>>> later. I think that is easier than now blocking on >>>>>>> reaching >>>>>>> > > >>>>>>> consensus >>>>>>> > > >>>>>>>>>>>> on a >>>>>>> > > >>>>>>>>>>>>> large group of metrics. >>>>>>> > > >>>>>>>>>>>>> - Find a solution to the problem Chesnay >>>>>>> mentioned, that >>>>>>> > the >>>>>>> > > >>>>>>> "records >>>>>>> > > >>>>>>>>>>>> in" >>>>>>> > > >>>>>>>>>>>>> metric is somehow overloaded and exists already in >>>>>>> the IO >>>>>>> > > >> Metric >>>>>>> > > >>>>>>>>> group. >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin < >>>>>>> > > >> becket....@gmail.com >>>>>>> > > >>>>>> >>>>>>> > > >>>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> Hi Stephan, >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> Thanks a lot for the feedback. All makes sense. >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> It is a good suggestion to simply have an >>>>>>> > onRecord(numBytes, >>>>>>> > > >>>>>>>>> eventTime) >>>>>>> > > >>>>>>>>>>>>>> method for connector writers. It should meet most >>>>>>> of the >>>>>>> > > >>>>>>>>> requirements, >>>>>>> > > >>>>>>>>>>>>>> individual >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> The configurable metrics feature is something >>>>>>> really >>>>>>> > useful, >>>>>>> > > >>>>>>>>>>>> especially if >>>>>>> > > >>>>>>>>>>>>>> we can somehow make it dynamically configurable >>>>>>> without >>>>>>> > > >> stopping >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>>> jobs. >>>>>>> > > >>>>>>>>>>>>>> It might be better to make it a separate >>>>>>> discussion >>>>>>> > because >>>>>>> > > it >>>>>>> > > >>>>> is a >>>>>>> > > >>>>>>>>>>>> more >>>>>>> > > >>>>>>>>>>>>>> generic feature instead of only for connectors. >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> So in order to make some progress, in this FLIP >>>>>>> we can >>>>>>> > limit >>>>>>> > > >> the >>>>>>> > > >>>>>>>>>>>> discussion >>>>>>> > > >>>>>>>>>>>>>> scope to the connector related items: >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> - the standard connector metric names and types. >>>>>>> > > >>>>>>>>>>>>>> - the abstract ConnectorMetricHandler interface >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> I'll start a separate thread to discuss other >>>>>>> general >>>>>>> > metric >>>>>>> > > >>>>>>> related >>>>>>> > > >>>>>>>>>>>>>> enhancement items including: >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> - optional metrics >>>>>>> > > >>>>>>>>>>>>>> - dynamic metric configuration >>>>>>> > > >>>>>>>>>>>>>> - potential combination with rate limiter >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> Does this plan sound reasonable? >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen < >>>>>>> > > >> se...@apache.org> >>>>>>> > > >>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> Ignoring for a moment implementation details, >>>>>>> this >>>>>>> > > connector >>>>>>> > > >>>>>>> metrics >>>>>>> > > >>>>>>>>>>>> work >>>>>>> > > >>>>>>>>>>>>>>> is a really good thing to do, in my opinion >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> The questions "oh, my job seems to be doing >>>>>>> nothing, I am >>>>>>> > > >>>>> looking >>>>>>> > > >>>>>>> at >>>>>>> > > >>>>>>>>>>>> the >>>>>>> > > >>>>>>>>>>>>>> UI >>>>>>> > > >>>>>>>>>>>>>>> and the 'records in' value is still zero" is in >>>>>>> the top >>>>>>> > > three >>>>>>> > > >>>>>>>>> support >>>>>>> > > >>>>>>>>>>>>>>> questions I have been asked personally. >>>>>>> > > >>>>>>>>>>>>>>> Introspection into "how far is the consumer >>>>>>> lagging >>>>>>> > behind" >>>>>>> > > >>>>> (event >>>>>>> > > >>>>>>>>>>>> time >>>>>>> > > >>>>>>>>>>>>>>> fetch latency) came up many times as well. >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> So big +1 to solving this problem. >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> About the exact design - I would try to go for >>>>>>> the >>>>>>> > > following >>>>>>> > > >>>>>>>>>>>> properties: >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> - keep complexity of of connectors. Ideally the >>>>>>> metrics >>>>>>> > > >> handler >>>>>>> > > >>>>>>> has >>>>>>> > > >>>>>>>>> a >>>>>>> > > >>>>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or >>>>>>> so, and >>>>>>> > > >>>>> everything >>>>>>> > > >>>>>>>>>>>> else is >>>>>>> > > >>>>>>>>>>>>>>> internal to the handler. That makes it dead >>>>>>> simple for >>>>>>> > the >>>>>>> > > >>>>>>>>> connector. >>>>>>> > > >>>>>>>>>>>> We >>>>>>> > > >>>>>>>>>>>>>>> can also think of an extensive scheme for >>>>>>> connector >>>>>>> > > specific >>>>>>> > > >>>>>>>>> metrics. >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> - make it configurable on the job it cluster >>>>>>> level which >>>>>>> > > >>>>> metrics >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>> handler internally creates when that method is >>>>>>> invoked. >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> What do you think? >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> Best, >>>>>>> > > >>>>>>>>>>>>>>> Stephan >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay >>>>>>> Schepler < >>>>>>> > > >>>>>>>>> ches...@apache.org >>>>>>> > > >>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> As I said before, I believe this to be >>>>>>> over-engineered >>>>>>> > and >>>>>>> > > >>>>> have >>>>>>> > > >>>>>>> no >>>>>>> > > >>>>>>>>>>>>>>>> interest in this implementation. >>>>>>> > > >>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> There are conceptual issues like defining a >>>>>>> duplicate >>>>>>> > > >>>>>>>>>>>>>> numBytesIn(PerSec) >>>>>>> > > >>>>>>>>>>>>>>>> metric that already exists for each operator. >>>>>>> > > >>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote: >>>>>>> > > >>>>>>>>>>>>>>>>> A few updates to the thread. I uploaded a >>>>>>> patch[1] as a >>>>>>> > > >>>>> complete >>>>>>> > > >>>>>>>>>>>>>>>>> example of how users can use the metrics in >>>>>>> the future. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Some thoughts below after taking a look at the >>>>>>> > > >>>>>>> AbstractMetricGroup >>>>>>> > > >>>>>>>>>>>>>> and >>>>>>> > > >>>>>>>>>>>>>>>>> its subclasses. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> This patch intends to provide convenience for >>>>>>> Flink >>>>>>> > > >> connector >>>>>>> > > >>>>>>>>>>>>>>>>> implementations to follow metrics standards >>>>>>> proposed in >>>>>>> > > >>>>> FLIP-33. >>>>>>> > > >>>>>>>>> It >>>>>>> > > >>>>>>>>>>>>>>>>> also try to enhance the metric management in >>>>>>> general >>>>>>> > way >>>>>>> > > to >>>>>>> > > >>>>> help >>>>>>> > > >>>>>>>>>>>>>> users >>>>>>> > > >>>>>>>>>>>>>>>>> with: >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 1. metric definition >>>>>>> > > >>>>>>>>>>>>>>>>> 2. metric dependencies check >>>>>>> > > >>>>>>>>>>>>>>>>> 3. metric validation >>>>>>> > > >>>>>>>>>>>>>>>>> 4. metric control (turn on / off particular >>>>>>> metrics) >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the >>>>>>> > > functionality >>>>>>> > > >> of >>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The >>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the >>>>>>> metric group >>>>>>> > > >>>>>>> hierarchy, >>>>>>> > > >>>>>>>>>>>> but >>>>>>> > > >>>>>>>>>>>>>>>>> does not really manage the metrics other than >>>>>>> keeping >>>>>>> > > them >>>>>>> > > >>>>> in a >>>>>>> > > >>>>>>>>> Map. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Ideally we should only have one entry point >>>>>>> for the >>>>>>> > > >> metrics. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Right now the entry point is >>>>>>> |AbstractMetricGroup|. >>>>>>> > > >> However, >>>>>>> > > >>>>>>>>> besides >>>>>>> > > >>>>>>>>>>>>>>>>> the missing functionality mentioned above, >>>>>>> > > >>>>> |AbstractMetricGroup| >>>>>>> > > >>>>>>>>>>>>>> seems >>>>>>> > > >>>>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could >>>>>>> extract it out >>>>>>> > > to >>>>>>> > > >>>>>>>>>>>>>>>>> flink-metrics in order to use it for generic >>>>>>> purpose. >>>>>>> > > There >>>>>>> > > >>>>> will >>>>>>> > > >>>>>>>>> be >>>>>>> > > >>>>>>>>>>>>>>>>> some work, though. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics| >>>>>>> in this >>>>>>> > > patch >>>>>>> > > >>>>> as >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>> metric entry point. It wraps metric group and >>>>>>> provides >>>>>>> > > the >>>>>>> > > >>>>>>> missing >>>>>>> > > >>>>>>>>>>>>>>>>> functionalities. Then we can roll out this >>>>>>> pattern to >>>>>>> > > >> runtime >>>>>>> > > >>>>>>>>>>>>>>>>> components gradually as well. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> My first thought is that the latter approach >>>>>>> gives a >>>>>>> > more >>>>>>> > > >>>>> smooth >>>>>>> > > >>>>>>>>>>>>>>>>> migration. But I am also OK with doing a >>>>>>> refactoring on >>>>>>> > > the >>>>>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| family. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1 >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin < >>>>>>> > > >>>>>>> becket....@gmail.com >>>>>>> > > >>>>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote: >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> It might be easier to discuss some >>>>>>> implementation >>>>>>> > details >>>>>>> > > >> in >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>> PR review instead of in the FLIP discussion >>>>>>> thread. I >>>>>>> > > have >>>>>>> > > >> a >>>>>>> > > >>>>>>>>>>>>>> patch >>>>>>> > > >>>>>>>>>>>>>>>>> for Kafka connectors ready but haven't >>>>>>> submitted the PR >>>>>>> > > >> yet. >>>>>>> > > >>>>>>>>>>>>>>>>> Hopefully that will help explain a bit more. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> ** Re: metric type binding >>>>>>> > > >>>>>>>>>>>>>>>>> This is a valid point that worths discussing. >>>>>>> If I >>>>>>> > > >> understand >>>>>>> > > >>>>>>>>>>>>>>>>> correctly, there are two points: >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 1. Metric type / interface does not matter as >>>>>>> long as >>>>>>> > the >>>>>>> > > >>>>>>> metric >>>>>>> > > >>>>>>>>>>>>>>>>> semantic is clearly defined. >>>>>>> > > >>>>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as >>>>>>> the >>>>>>> > metric >>>>>>> > > >>>>>>>>>>>>>> semantic >>>>>>> > > >>>>>>>>>>>>>>>>> is defined, metric type does not matter. To >>>>>>> some >>>>>>> > extent, >>>>>>> > > >>>>> Gauge >>>>>>> > > >>>>>>> / >>>>>>> > > >>>>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be >>>>>>> think of >>>>>>> > as >>>>>>> > > >>>>> some >>>>>>> > > >>>>>>>>>>>>>>>>> well-recognized semantics, if you wish. In >>>>>>> Flink, these >>>>>>> > > >>>>> metric >>>>>>> > > >>>>>>>>>>>>>>>>> semantics have their associated interface >>>>>>> classes. In >>>>>>> > > >>>>> practice, >>>>>>> > > >>>>>>>>>>>>>>>>> such semantic to interface binding seems >>>>>>> necessary for >>>>>>> > > >>>>>>> different >>>>>>> > > >>>>>>>>>>>>>>>>> components to communicate. Simply standardize >>>>>>> the >>>>>>> > > semantic >>>>>>> > > >>>>> of >>>>>>> > > >>>>>>>>>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>> connector metrics seems not sufficient for >>>>>>> people to >>>>>>> > > build >>>>>>> > > >>>>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we >>>>>>> still >>>>>>> > need >>>>>>> > > >> to >>>>>>> > > >>>>>>>>> have >>>>>>> > > >>>>>>>>>>>>>>>>> some embodiment of the metric semantics that >>>>>>> people can >>>>>>> > > >>>>> program >>>>>>> > > >>>>>>>>>>>>>>>>> against. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be >>>>>>> exposed >>>>>>> > > using >>>>>>> > > >>>>>>>>>>>>>>>>> different metric types / interfaces. >>>>>>> > > >>>>>>>>>>>>>>>>> This is a good point. Counter and >>>>>>> Gauge-as-a-Counter >>>>>>> > are >>>>>>> > > >>>>> pretty >>>>>>> > > >>>>>>>>>>>>>>>>> much interchangeable. This is more of a >>>>>>> trade-off >>>>>>> > between >>>>>>> > > >> the >>>>>>> > > >>>>>>>>>>>>>> user >>>>>>> > > >>>>>>>>>>>>>>>>> experience of metric producers and consumers. >>>>>>> The >>>>>>> > metric >>>>>>> > > >>>>>>>>>>>>>> producers >>>>>>> > > >>>>>>>>>>>>>>>>> want to use Counter or Gauge depending on >>>>>>> whether the >>>>>>> > > >> counter >>>>>>> > > >>>>>>> is >>>>>>> > > >>>>>>>>>>>>>>>>> already tracked in code, while ideally the >>>>>>> metric >>>>>>> > > consumers >>>>>>> > > >>>>>>> only >>>>>>> > > >>>>>>>>>>>>>>>>> want to see a single metric type for each >>>>>>> metric. I am >>>>>>> > > >>>>> leaning >>>>>>> > > >>>>>>>>>>>>>>>>> towards to make the metric producers happy, >>>>>>> i.e. allow >>>>>>> > > >> Gauge >>>>>>> > > >>>>> / >>>>>>> > > >>>>>>>>>>>>>>>>> Counter metric type, and the the metric >>>>>>> consumers >>>>>>> > handle >>>>>>> > > >> the >>>>>>> > > >>>>>>>>> type >>>>>>> > > >>>>>>>>>>>>>>>>> variation. The reason is that in practice, >>>>>>> there might >>>>>>> > be >>>>>>> > > >>>>> more >>>>>>> > > >>>>>>>>>>>>>>>>> connector implementations than metric reporter >>>>>>> > > >>>>> implementations. >>>>>>> > > >>>>>>>>>>>>>> We >>>>>>> > > >>>>>>>>>>>>>>>>> could also provide some helper method to >>>>>>> facilitate >>>>>>> > > reading >>>>>>> > > >>>>>>> from >>>>>>> > > >>>>>>>>>>>>>>>>> such variable metric type. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Just some quick replies to the comments around >>>>>>> > > >> implementation >>>>>>> > > >>>>>>>>>>>>>>>> details. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>>>>> except >>>>>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>>>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Register connector specific ones in a single >>>>>>> place is >>>>>>> > > >>>>> actually >>>>>>> > > >>>>>>>>>>>>>>>>> something that I want to achieve. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases >>>>>>> like >>>>>>> > > >>>>> Prometheus. >>>>>>> > > >>>>>>>>>>>>>> We >>>>>>> > > >>>>>>>>>>>>>>>>> would >>>>>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>>> > > >>>>>>>>> fetchTime/emitTime >>>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend >>>>>>> (Prometheus), >>>>>>> > where a >>>>>>> > > >>>>>>> user >>>>>>> > > >>>>>>>>>>>>>>>>> could build >>>>>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants >>>>>>> it. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Not sure if such downsampling works. As an >>>>>>> example, if >>>>>>> > a >>>>>>> > > >> user >>>>>>> > > >>>>>>>>>>>>>>>>> complains that there are some intermittent >>>>>>> latency >>>>>>> > spikes >>>>>>> > > >>>>>>> (maybe >>>>>>> > > >>>>>>>>>>>>>> a >>>>>>> > > >>>>>>>>>>>>>>>>> few records in 10 seconds) in their processing >>>>>>> system. >>>>>>> > > >>>>> Having a >>>>>>> > > >>>>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems >>>>>>> unlikely >>>>>>> > > useful. >>>>>>> > > >>>>>>>>>>>>>>>>> However by looking at actual 99.9 percentile >>>>>>> latency >>>>>>> > > might >>>>>>> > > >>>>>>> help. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay >>>>>>> Schepler >>>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org >>>>>>> >> >>>>>>> > wrote: >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Re: over complication of implementation. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> I think I get understand better know what >>>>>>> you're >>>>>>> > > >> shooting >>>>>>> > > >>>>>>>>>>>>>> for, >>>>>>> > > >>>>>>>>>>>>>>>>> effectively something like the >>>>>>> OperatorIOMetricGroup. >>>>>>> > > >>>>>>>>>>>>>>>>> But still, re-define setupConnectorMetrics() >>>>>>> to >>>>>>> > accept >>>>>>> > > a >>>>>>> > > >>>>>>> set >>>>>>> > > >>>>>>>>>>>>>>>>> of flags >>>>>>> > > >>>>>>>>>>>>>>>>> for counters/meters(ans _possibly_ >>>>>>> histograms) along >>>>>>> > > >>>>> with a >>>>>>> > > >>>>>>>>>>>>>>>>> set of >>>>>>> > > >>>>>>>>>>>>>>>>> well-defined Optional<Gauge<?>>, and return >>>>>>> the >>>>>>> > group. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Solves all issues as far as i can tell: >>>>>>> > > >>>>>>>>>>>>>>>>> 1) no metrics must be created manually >>>>>>> (except >>>>>>> > Gauges, >>>>>>> > > >>>>>>> which >>>>>>> > > >>>>>>>>>>>>>>> are >>>>>>> > > >>>>>>>>>>>>>>>>> effectively just Suppliers and you can't get >>>>>>> around >>>>>>> > > >>>>> this), >>>>>>> > > >>>>>>>>>>>>>>>>> 2) additional metrics can be registered on >>>>>>> the >>>>>>> > returned >>>>>>> > > >>>>>>>>>>>>>> group, >>>>>>> > > >>>>>>>>>>>>>>>>> 3) see 1), >>>>>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>>>>> except >>>>>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>>>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> Re: Histogram >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 1) As an example, whether "numRecordsIn" is >>>>>>> exposed >>>>>>> > as >>>>>>> > > a >>>>>>> > > >>>>>>>>>>>>>>>>> Counter or a >>>>>>> > > >>>>>>>>>>>>>>>>> Gauge should be irrelevant. So far we're >>>>>>> using the >>>>>>> > > >> metric >>>>>>> > > >>>>>>>>>>>>>> type >>>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>>> > > >>>>>>>>>>>>>>>>> the most convenient at exposing a given >>>>>>> value. If >>>>>>> > there >>>>>>> > > >>>>> is >>>>>>> > > >>>>>>>>>>>>>>>>> some backing >>>>>>> > > >>>>>>>>>>>>>>>>> data-structure that we want to expose some >>>>>>> data from >>>>>>> > we >>>>>>> > > >>>>>>>>>>>>>>>>> typically opt >>>>>>> > > >>>>>>>>>>>>>>>>> for a Gauge, as otherwise we're just mucking >>>>>>> around >>>>>>> > > with >>>>>>> > > >>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>> Meter/Counter API to get it to match. >>>>>>> Similarly, if >>>>>>> > we >>>>>>> > > >>>>> want >>>>>>> > > >>>>>>>>>>>>>> to >>>>>>> > > >>>>>>>>>>>>>>>>> count >>>>>>> > > >>>>>>>>>>>>>>>>> something but no current count exists we >>>>>>> typically >>>>>>> > > added >>>>>>> > > >>>>> a >>>>>>> > > >>>>>>>>>>>>>>>>> Counter. >>>>>>> > > >>>>>>>>>>>>>>>>> That's why attaching semantics to metric >>>>>>> types makes >>>>>>> > > >>>>> little >>>>>>> > > >>>>>>>>>>>>>>>>> sense (but >>>>>>> > > >>>>>>>>>>>>>>>>> unfortunately several reporters already do >>>>>>> it); for >>>>>>> > > >>>>>>>>>>>>>>>>> counters/meters >>>>>>> > > >>>>>>>>>>>>>>>>> certainly, but the majority of metrics are >>>>>>> gauges. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases >>>>>>> like >>>>>>> > > >>>>> Prometheus. >>>>>>> > > >>>>>>>>>>>>>> We >>>>>>> > > >>>>>>>>>>>>>>>>> would >>>>>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>>>>> > > >>>>>>>>> fetchTime/emitTime >>>>>>> > > >>>>>>>>>>>>>>>>> that is >>>>>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend >>>>>>> (Prometheus), >>>>>>> > where a >>>>>>> > > >>>>>>> user >>>>>>> > > >>>>>>>>>>>>>>>>> could build >>>>>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants >>>>>>> it. >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> On 22.02.2019 13:57, Becket Qin wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the explanation. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: FLIP >>>>>>> > > >>>>>>>>>>>>>>>>>> I might have misunderstood this, but it seems >>>>>>> that >>>>>>> > > "major >>>>>>> > > >>>>>>>>>>>>>>>>> changes" are well >>>>>>> > > >>>>>>>>>>>>>>>>>> defined in FLIP. The full contents is >>>>>>> following: >>>>>>> > > >>>>>>>>>>>>>>>>>> What is considered a "major change" that >>>>>>> needs a FLIP? >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Any of the following should be considered a >>>>>>> major >>>>>>> > > change: >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece >>>>>>> of >>>>>>> > > >>>>>>>>>>>>>>>>> functionality >>>>>>> > > >>>>>>>>>>>>>>>>>> - *Any change that impacts the public >>>>>>> interfaces of >>>>>>> > the >>>>>>> > > >>>>>>>>>>>>>>>>> project* >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> What are the "public interfaces" of the >>>>>>> project? >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> *All of the following are public interfaces >>>>>>> *that >>>>>>> > people >>>>>>> > > >>>>>>>>>>>>>>>>> build around: >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - DataStream and DataSet API, including >>>>>>> classes >>>>>>> > related >>>>>>> > > >>>>>>>>>>>>>>>>> to that, such as >>>>>>> > > >>>>>>>>>>>>>>>>>> StreamExecutionEnvironment >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - Classes marked with the @Public annotation >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - On-disk binary formats, such as >>>>>>> > > >>>>>>>>>>>>>> checkpoints/savepoints >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e. >>>>>>> > > >>>>>>>>>>>>>>>>> bin/flink, Yarn scripts, >>>>>>> > > >>>>>>>>>>>>>>>>>> Mesos scripts >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - Configuration settings >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> - *Exposed monitoring information* >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> So any monitoring information change is >>>>>>> considered as >>>>>>> > > >>>>>>>>>>>>>> public >>>>>>> > > >>>>>>>>>>>>>>>>> interface, and >>>>>>> > > >>>>>>>>>>>>>>>>>> any public interface change is considered as >>>>>>> a "major >>>>>>> > > >>>>>>>>>>>>>>> change". >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> ** Re: over complication of implementation. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Although this is more of implementation >>>>>>> details that >>>>>>> > is >>>>>>> > > >> not >>>>>>> > > >>>>>>>>>>>>>>>>> covered by the >>>>>>> > > >>>>>>>>>>>>>>>>>> FLIP. But it may be worth discussing. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> First of all, I completely agree that we >>>>>>> should use >>>>>>> > the >>>>>>> > > >>>>>>>>>>>>>>>>> simplest way to >>>>>>> > > >>>>>>>>>>>>>>>>>> achieve our goal. To me the goal is the >>>>>>> following: >>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces. >>>>>>> > > >>>>>>>>>>>>>>>>>> 2. The easiness of creating a connector. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Both of them are important to the prosperity >>>>>>> of the >>>>>>> > > >>>>>>>>>>>>>>>>> connector ecosystem. So >>>>>>> > > >>>>>>>>>>>>>>>>>> I'd rather abstract as much as possible on >>>>>>> our side to >>>>>>> > > >> make >>>>>>> > > >>>>>>>>>>>>>>>>> the connector >>>>>>> > > >>>>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a >>>>>>> static >>>>>>> > util >>>>>>> > > >>>>>>>>>>>>>>>>> method approach >>>>>>> > > >>>>>>>>>>>>>>>>>> might have a few drawbacks: >>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Users still have to construct the metrics >>>>>>> by >>>>>>> > > >> themselves. >>>>>>> > > >>>>>>>>>>>>>>>>> (And note that >>>>>>> > > >>>>>>>>>>>>>>>>>> this might be erroneous by itself. For >>>>>>> example, a >>>>>>> > > customer >>>>>>> > > >>>>>>>>>>>>>>>>> wrapper around >>>>>>> > > >>>>>>>>>>>>>>>>>> dropwizard meter maybe used instead of >>>>>>> MeterView). >>>>>>> > > >>>>>>>>>>>>>>>>>> 2. When connector specific metrics are added, >>>>>>> it is >>>>>>> > > >>>>>>>>>>>>>>>>> difficult to enforce >>>>>>> > > >>>>>>>>>>>>>>>>>> the scope to be the same as standard metrics. >>>>>>> > > >>>>>>>>>>>>>>>>>> 3. It seems that a method proliferation is >>>>>>> inevitable >>>>>>> > if >>>>>>> > > >> we >>>>>>> > > >>>>>>>>>>>>>>>>> want to apply >>>>>>> > > >>>>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn >>>>>>> was not >>>>>>> > > >>>>>>>>>>>>>>>>> registered for a meter. >>>>>>> > > >>>>>>>>>>>>>>>>>> 4. Metrics are still defined in random places >>>>>>> and hard >>>>>>> > > to >>>>>>> > > >>>>>>>>>>>>>>>> track. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> The current PR I had was inspired by the >>>>>>> Config system >>>>>>> > > in >>>>>>> > > >>>>>>>>>>>>>>>>> Kafka, which I >>>>>>> > > >>>>>>>>>>>>>>>>>> found pretty handy. In fact it is not only >>>>>>> used by >>>>>>> > Kafka >>>>>>> > > >>>>>>>>>>>>>>>>> itself but even >>>>>>> > > >>>>>>>>>>>>>>>>>> some other projects that depend on Kafka. I >>>>>>> am not >>>>>>> > > saying >>>>>>> > > >>>>>>>>>>>>>>>>> this approach is >>>>>>> > > >>>>>>>>>>>>>>>>>> perfect. But I think it worths to save the >>>>>>> work for >>>>>>> > > >>>>>>>>>>>>>>>>> connector writers and >>>>>>> > > >>>>>>>>>>>>>>>>>> encourage more systematic implementation. >>>>>>> That being >>>>>>> > > said, >>>>>>> > > >>>>>>>>>>>>>> I >>>>>>> > > >>>>>>>>>>>>>>>>> am fully open >>>>>>> > > >>>>>>>>>>>>>>>>>> to suggestions. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Re: Histogram >>>>>>> > > >>>>>>>>>>>>>>>>>> I think there are two orthogonal questions >>>>>>> around >>>>>>> > those >>>>>>> > > >>>>>>>>>>>>>>>> metrics: >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just >>>>>>> looking at >>>>>>> > the >>>>>>> > > >>>>>>>>>>>>>>>>> meaning of a >>>>>>> > > >>>>>>>>>>>>>>>>>> metric, is generic to all connectors? If the >>>>>>> answer is >>>>>>> > > >> yes, >>>>>>> > > >>>>>>>>>>>>>>>>> we should >>>>>>> > > >>>>>>>>>>>>>>>>>> include the metric into the convention. No >>>>>>> matter >>>>>>> > > whether >>>>>>> > > >>>>>>>>>>>>>> we >>>>>>> > > >>>>>>>>>>>>>>>>> include it >>>>>>> > > >>>>>>>>>>>>>>>>>> into the convention or not, some connector >>>>>>> > > implementations >>>>>>> > > >>>>>>>>>>>>>>>>> will emit such >>>>>>> > > >>>>>>>>>>>>>>>>>> metric. It is better to have a convention >>>>>>> than letting >>>>>>> > > >> each >>>>>>> > > >>>>>>>>>>>>>>>>> connector do >>>>>>> > > >>>>>>>>>>>>>>>>>> random things. >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what >>>>>>> should we >>>>>>> > > do? >>>>>>> > > >>>>>>>>>>>>>>>>>> I agree that we should make it clear that >>>>>>> using >>>>>>> > > histograms >>>>>>> > > >>>>>>>>>>>>>>>>> will have >>>>>>> > > >>>>>>>>>>>>>>>>>> performance risk. But I do see histogram is >>>>>>> useful in >>>>>>> > > some >>>>>>> > > >>>>>>>>>>>>>>>>> fine-granularity >>>>>>> > > >>>>>>>>>>>>>>>>>> debugging where one do not have the luxury to >>>>>>> stop the >>>>>>> > > >>>>>>>>>>>>>>>>> system and inject >>>>>>> > > >>>>>>>>>>>>>>>>>> more inspection code. So the workaround I am >>>>>>> thinking >>>>>>> > is >>>>>>> > > >> to >>>>>>> > > >>>>>>>>>>>>>>>>> provide some >>>>>>> > > >>>>>>>>>>>>>>>>>> implementation suggestions. Assume later on >>>>>>> we have a >>>>>>> > > >>>>>>>>>>>>>>>>> mechanism of >>>>>>> > > >>>>>>>>>>>>>>>>>> selective metrics. In the abstract metrics >>>>>>> class we >>>>>>> > can >>>>>>> > > >>>>>>>>>>>>>>>>> disable those >>>>>>> > > >>>>>>>>>>>>>>>>>> metrics by default individual connector >>>>>>> writers does >>>>>>> > not >>>>>>> > > >>>>>>>>>>>>>>>>> have to do >>>>>>> > > >>>>>>>>>>>>>>>>>> anything (this is another advantage of having >>>>>>> an >>>>>>> > > >>>>>>>>>>>>>>>>> AbstractMetrics instead of >>>>>>> > > >>>>>>>>>>>>>>>>>> static util methods.) >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> I am not sure I fully understand the >>>>>>> histogram in the >>>>>>> > > >>>>>>>>>>>>>>>>> backend approach. Can >>>>>>> > > >>>>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting >>>>>>> the raw >>>>>>> > > data, >>>>>>> > > >>>>>>>>>>>>>>>>> e.g. fetchTime >>>>>>> > > >>>>>>>>>>>>>>>>>> and emitTime with each record and let the >>>>>>> histogram >>>>>>> > > >>>>>>>>>>>>>>>>> computation happen in >>>>>>> > > >>>>>>>>>>>>>>>>>> the background? Or let the processing thread >>>>>>> putting >>>>>>> > the >>>>>>> > > >>>>>>>>>>>>>>>>> values into a >>>>>>> > > >>>>>>>>>>>>>>>>>> queue and have a separate thread polling from >>>>>>> the >>>>>>> > queue >>>>>>> > > >> and >>>>>>> > > >>>>>>>>>>>>>>>>> add them into >>>>>>> > > >>>>>>>>>>>>>>>>>> the histogram? >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay >>>>>>> Schepler >>>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto: >>>>>>> ches...@apache.org>> >>>>>>> > > wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Flip >>>>>>> > > >>>>>>>>>>>>>>>>>>> The very first line under both the main >>>>>>> header and >>>>>>> > > >> Purpose >>>>>>> > > >>>>>>>>>>>>>>>>> section >>>>>>> > > >>>>>>>>>>>>>>>>>>> describe Flips as "major changes", which >>>>>>> this isn't. >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: complication >>>>>>> > > >>>>>>>>>>>>>>>>>>> I'm not arguing against standardization, but >>>>>>> again an >>>>>>> > > >>>>>>>>>>>>>>>>> over-complicated >>>>>>> > > >>>>>>>>>>>>>>>>>>> implementation when a static utility method >>>>>>> would be >>>>>>> > > >>>>>>>>>>>>>>>>> sufficient. >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> public static void setupConnectorMetrics( >>>>>>> > > >>>>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup, >>>>>>> > > >>>>>>>>>>>>>>>>>>> String connectorName, >>>>>>> > > >>>>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn, >>>>>>> > > >>>>>>>>>>>>>>>>>>> ...) >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> This gives you all you need: >>>>>>> > > >>>>>>>>>>>>>>>>>>> * a well-defined set of metrics for a >>>>>>> connector to >>>>>>> > > opt-in >>>>>>> > > >>>>>>>>>>>>>>>>>>> * standardized naming schemes for scope and >>>>>>> > individual >>>>>>> > > >>>>>>>>>>>>>>> metrics >>>>>>> > > >>>>>>>>>>>>>>>>>>> * standardize metric types (although >>>>>>> personally I'm >>>>>>> > not >>>>>>> > > >>>>>>>>>>>>>>>>> interested in that >>>>>>> > > >>>>>>>>>>>>>>>>>>> since metric types should be considered >>>>>>> syntactic >>>>>>> > > sugar) >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> Re: Configurable Histogram >>>>>>> > > >>>>>>>>>>>>>>>>>>> If anything they _must_ be turned off by >>>>>>> default, but >>>>>>> > > the >>>>>>> > > >>>>>>>>>>>>>>>>> metric system is >>>>>>> > > >>>>>>>>>>>>>>>>>>> already exposing so many options that I'm >>>>>>> not too >>>>>>> > keen >>>>>>> > > on >>>>>>> > > >>>>>>>>>>>>>>>>> adding even more. >>>>>>> > > >>>>>>>>>>>>>>>>>>> You have also only addressed my first >>>>>>> argument >>>>>>> > against >>>>>>> > > >>>>>>>>>>>>>>>>> histograms >>>>>>> > > >>>>>>>>>>>>>>>>>>> (performance), the second one still stands >>>>>>> (calculate >>>>>>> > > >>>>>>>>>>>>>>>>> histogram in metric >>>>>>> > > >>>>>>>>>>>>>>>>>>> backends instead). >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Chesnay, >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is >>>>>>> worthy of a >>>>>>> > > >> FLIP >>>>>>> > > >>>>>>>>>>>>>>>>> because it is >>>>>>> > > >>>>>>>>>>>>>>>>>>>> public API. According to the FLIP >>>>>>> description a FlIP >>>>>>> > > is >>>>>>> > > >>>>>>>>>>>>>>>>> required in case >>>>>>> > > >>>>>>>>>>>>>>>>>>> of: >>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Any change that impacts the public >>>>>>> interfaces of >>>>>>> > > >>>>>>>>>>>>>>>>> the project >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> and the following entry is found in the >>>>>>> definition >>>>>>> > of >>>>>>> > > >>>>>>>>>>>>>>>>> "public interface". >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> - Exposed monitoring information >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Metrics are critical to any production >>>>>>> system. So a >>>>>>> > > >> clear >>>>>>> > > >>>>>>>>>>>>>>>>> metric >>>>>>> > > >>>>>>>>>>>>>>>>>>> definition >>>>>>> > > >>>>>>>>>>>>>>>>>>>> is important for any serious users. For an >>>>>>> > > organization >>>>>>> > > >>>>>>>>>>>>>>>>> with large Flink >>>>>>> > > >>>>>>>>>>>>>>>>>>>> installation, change in metrics means great >>>>>>> amount >>>>>>> > of >>>>>>> > > >>>>>>>>>>>>>>>>> work. So such >>>>>>> > > >>>>>>>>>>>>>>>>>>> changes >>>>>>> > > >>>>>>>>>>>>>>>>>>>> do need to be fully discussed and >>>>>>> documented. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: Histogram. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> We can discuss whether there is a better >>>>>>> way to >>>>>>> > expose >>>>>>> > > >>>>>>>>>>>>>>>>> metrics that are >>>>>>> > > >>>>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark >>>>>>> on >>>>>>> > various >>>>>>> > > >>>>>>>>>>>>>>>>> histogram >>>>>>> > > >>>>>>>>>>>>>>>>>>>> implementations also indicates that they are >>>>>>> > > >>>>>>>>>>>>>> significantly >>>>>>> > > >>>>>>>>>>>>>>>>> slower than >>>>>>> > > >>>>>>>>>>>>>>>>>>>> other metric types. But I don't think that >>>>>>> means >>>>>>> > never >>>>>>> > > >>>>>>>>>>>>>> use >>>>>>> > > >>>>>>>>>>>>>>>>> histogram, but >>>>>>> > > >>>>>>>>>>>>>>>>>>>> means use it with caution. For example, we >>>>>>> can >>>>>>> > suggest >>>>>>> > > >>>>>>>>>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>>>> implementations >>>>>>> > > >>>>>>>>>>>>>>>>>>>> to turn them off by default and only turn >>>>>>> it on for >>>>>>> > a >>>>>>> > > >>>>>>>>>>>>>>>>> small amount of >>>>>>> > > >>>>>>>>>>>>>>>>>>> time >>>>>>> > > >>>>>>>>>>>>>>>>>>>> when performing some micro-debugging. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: complication: >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Connector conventions are essential for >>>>>>> Flink >>>>>>> > > ecosystem. >>>>>>> > > >>>>>>>>>>>>>>>>> Flink connectors >>>>>>> > > >>>>>>>>>>>>>>>>>>>> pool is probably the most important part of >>>>>>> Flink, >>>>>>> > > just >>>>>>> > > >>>>>>>>>>>>>>>>> like any other >>>>>>> > > >>>>>>>>>>>>>>>>>>> data >>>>>>> > > >>>>>>>>>>>>>>>>>>>> system. Clear conventions of connectors >>>>>>> will help >>>>>>> > > build >>>>>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem >>>>>>> > > >>>>>>>>>>>>>>>>>>> in >>>>>>> > > >>>>>>>>>>>>>>>>>>>> a more organic way. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Take the metrics convention as an example, >>>>>>> imagine >>>>>>> > > >>>>>>>>>>>>>> someone >>>>>>> > > >>>>>>>>>>>>>>>>> has developed >>>>>>> > > >>>>>>>>>>>>>>>>>>> a >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Flink connector for System foo, and another >>>>>>> > developer >>>>>>> > > >> may >>>>>>> > > >>>>>>>>>>>>>>>>> have developed >>>>>>> > > >>>>>>>>>>>>>>>>>>> a >>>>>>> > > >>>>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for >>>>>>> Flink which >>>>>>> > > >>>>>>>>>>>>>>>>> analyzes the Flink >>>>>>> > > >>>>>>>>>>>>>>>>>>> job >>>>>>> > > >>>>>>>>>>>>>>>>>>>> performance based on metrics. With a clear >>>>>>> metric >>>>>>> > > >>>>>>>>>>>>>>>>> convention, those two >>>>>>> > > >>>>>>>>>>>>>>>>>>>> projects could be developed independently. >>>>>>> Once >>>>>>> > users >>>>>>> > > >> put >>>>>>> > > >>>>>>>>>>>>>>>>> them together, >>>>>>> > > >>>>>>>>>>>>>>>>>>>> it would work without additional >>>>>>> modifications. This >>>>>>> > > >>>>>>>>>>>>>>>>> cannot be easily >>>>>>> > > >>>>>>>>>>>>>>>>>>>> achieved by just defining a few constants. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: selective metrics: >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate >>>>>>> thread. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> @Dawid >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency >>>>>>> > > >>>>>>>>>>>>>>>>>>>> The primary purpose of establish such a >>>>>>> convention >>>>>>> > is >>>>>>> > > to >>>>>>> > > >>>>>>>>>>>>>>>>> help developers >>>>>>> > > >>>>>>>>>>>>>>>>>>>> write connectors in a more compatible way. >>>>>>> The >>>>>>> > > >> convention >>>>>>> > > >>>>>>>>>>>>>>>>> is supposed to >>>>>>> > > >>>>>>>>>>>>>>>>>>> be >>>>>>> > > >>>>>>>>>>>>>>>>>>>> defined more proactively. So when look at >>>>>>> the >>>>>>> > > >> convention, >>>>>>> > > >>>>>>>>>>>>>>>>> it seems more >>>>>>> > > >>>>>>>>>>>>>>>>>>>> important to see if the concept is >>>>>>> applicable to >>>>>>> > > >>>>>>>>>>>>>>>>> connectors in general. >>>>>>> > > >>>>>>>>>>>>>>>>>>> It >>>>>>> > > >>>>>>>>>>>>>>>>>>>> might be true so far only Kafka connector >>>>>>> reports >>>>>>> > > >>>>>>>>>>>>>> latency. >>>>>>> > > >>>>>>>>>>>>>>>>> But there >>>>>>> > > >>>>>>>>>>>>>>>>>>> might >>>>>>> > > >>>>>>>>>>>>>>>>>>>> be hundreds of other connector >>>>>>> implementations in >>>>>>> > the >>>>>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem, >>>>>>> > > >>>>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of >>>>>>> them also >>>>>>> > > >> emits >>>>>>> > > >>>>>>>>>>>>>>>>> latency. I >>>>>>> > > >>>>>>>>>>>>>>>>>>> think >>>>>>> > > >>>>>>>>>>>>>>>>>>>> a lot of other sources actually also has an >>>>>>> append >>>>>>> > > >>>>>>>>>>>>>>>>> timestamp. e.g. >>>>>>> > > >>>>>>>>>>>>>>>>>>> database >>>>>>> > > >>>>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't >>>>>>> be >>>>>>> > > surprised >>>>>>> > > >>>>>>>>>>>>>>>>> if some database >>>>>>> > > >>>>>>>>>>>>>>>>>>>> connector can also emit latency metrics. >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay >>>>>>> Schepler >>>>>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto: >>>>>>> ches...@apache.org>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to >>>>>>> investigate >>>>>>> > > this >>>>>>> > > >>>>>>>>>>>>>> as >>>>>>> > > >>>>>>>>>>>>>>>>> part of this >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of >>>>>>> interest >>>>>>> > for >>>>>>> > > >>>>>>>>>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>> entire metric >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> system, and should be designed for as such. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to >>>>>>> the core of >>>>>>> > > >>>>>>>>>>>>>>>>> Flink. They are >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> significantly more expensive than other >>>>>>> metrics, >>>>>>> > and >>>>>>> > > >>>>>>>>>>>>>>>>> calculating >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> histograms in the application is regarded >>>>>>> as an >>>>>>> > > >>>>>>>>>>>>>>>>> anti-pattern by several >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to >>>>>>> expose >>>>>>> > the >>>>>>> > > >> raw >>>>>>> > > >>>>>>>>>>>>>>>>> data and >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> calculate the histogram in the backend. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Second, this seems overly complicated. >>>>>>> Given that >>>>>>> > we >>>>>>> > > >>>>>>>>>>>>>>>>> already established >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> that not all connectors will export all >>>>>>> metrics we >>>>>>> > > are >>>>>>> > > >>>>>>>>>>>>>>>>> effectively >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming >>>>>>> scheme. >>>>>>> > We >>>>>>> > > >>>>>>>>>>>>>>>>> don't need anything >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a >>>>>>> few >>>>>>> > > constants >>>>>>> > > >>>>>>>>>>>>>>>>> that all >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connectors use. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a >>>>>>> FLIP. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz >>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. >>>>>>> But >>>>>>> > those >>>>>>> > > >>>>>>>>>>>>>>>>> particular metrics >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would >>>>>>> only be >>>>>>> > > >>>>>>>>>>>>>>>>> reported if user uses >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal >>>>>>> timestampAssigner with >>>>>>> > > >>>>>>>>>>>>>>>>> StreamCharacteristic >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like >>>>>>> a very >>>>>>> > > >>>>>>>>>>>>>>>>> specific case. I am >>>>>>> > > >>>>>>>>>>>>>>>>>>> not >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic >>>>>>> metric that >>>>>>> > > will >>>>>>> > > >>>>>>>>>>>>>> be >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> disabled/absent for most of >>>>>>> implementations. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal >>>>>>> issue, that >>>>>>> > > might >>>>>>> > > >>>>>>>>>>>>>>>>> make sense to >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> investigate in the future. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes >>>>>>> sense to me. >>>>>>> > > >> There >>>>>>> > > >>>>>>>>>>>>>>>>> are two cases >>>>>>> > > >>>>>>>>>>>>>>>>>>> to >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> be >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> addressed. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a >>>>>>> guidance. It >>>>>>> > is >>>>>>> > > >>>>>>>>>>>>>>>>> likely that a >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> only supports some but not all of the >>>>>>> metrics. In >>>>>>> > > >> that >>>>>>> > > >>>>>>>>>>>>>>>>> case, each >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> implementation should have the freedom >>>>>>> to decide >>>>>>> > > >> which >>>>>>> > > >>>>>>>>>>>>>>>>> metrics are >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are >>>>>>> supported, the >>>>>>> > > >>>>>>>>>>>>>>>>> guidance should be >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> followed. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable >>>>>>> certain >>>>>>> > > >> metrics >>>>>>> > > >>>>>>>>>>>>>>>>> for some reason >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of >>>>>>> data). A >>>>>>> > > generic >>>>>>> > > >>>>>>>>>>>>>>>>> mechanism should >>>>>>> > > >>>>>>>>>>>>>>>>>>> be >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> provided to allow user choose which >>>>>>> metrics are >>>>>>> > > >>>>>>>>>>>>>>>>> reported. This >>>>>>> > > >>>>>>>>>>>>>>>>>>> mechanism >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> should also be honored by the connector >>>>>>> > > >>>>>>>>>>>>>> implementations. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you? >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid >>>>>>> Wysakowicz >>>>>>> > < >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto: >>>>>>> > > dwysakow...@apache.org >>>>>>> > > >>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a >>>>>>> unified, >>>>>>> > > >>>>>>>>>>>>>>>>> standard set of >>>>>>> > > >>>>>>>>>>>>>>>>>>> metrics >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> for >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight >>>>>>> concerns >>>>>>> > about >>>>>>> > > >>>>>>>>>>>>>>>>> fetchLatency and >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based >>>>>>> on >>>>>>> > > EventTime >>>>>>> > > >>>>>>>>>>>>>>>>> which is not a >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> purely >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on >>>>>>> some >>>>>>> > > business >>>>>>> > > >>>>>>>>>>>>>>>>> logic, might be >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> absent >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics >>>>>>> could >>>>>>> > also >>>>>>> > > >>>>>>>>>>>>>>>>> behave in a weird >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> way in >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I >>>>>>> am not >>>>>>> > sure >>>>>>> > > >> if >>>>>>> > > >>>>>>>>>>>>>>>>> we should >>>>>>> > > >>>>>>>>>>>>>>>>>>> include >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we >>>>>>> could at >>>>>>> > least >>>>>>> > > >>>>>>>>>>>>>>>>> introduce a feature >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think? >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the >>>>>>> proposed >>>>>>> > > >>>>>>>>>>>>>>>>> metrics. I'll start a >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> voting thread later toady. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket >>>>>>> Qin >>>>>>> > > >>>>>>>>>>>>>>>>> <becket....@gmail.com <mailto: >>>>>>> becket....@gmail.com>> >>>>>>> > < >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto: >>>>>>> becket....@gmail.com >>>>>>> > >> >>>>>>> > > >>>>>>>>>>>>>>> wrote: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi folks, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP >>>>>>> discussion thread >>>>>>> > > >>>>>>>>>>>>>> about >>>>>>> > > >>>>>>>>>>>>>>>>> standardize >>>>>>> > > >>>>>>>>>>>>>>>>>>> the >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> connector metrics. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a >>>>>>> convention >>>>>>> > of >>>>>>> > > >>>>>>>>>>>>>>>>> Flink connector >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the >>>>>>> monitoring >>>>>>> > and >>>>>>> > > >>>>>>>>>>>>>>>>> alerting on Flink >>>>>>> > > >>>>>>>>>>>>>>>>>>>>> jobs. >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> The FLIP link is following: >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>> >>>>>>> > > >> >>>>>>> > > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>>>> >>>>>>> > > >>>>> >>>>>>> > > >>>>> >>>>>>> > > >> >>>>>>> > > >> >>>>>>> > > >>>>>>> > > >>>>>>> > >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Konstantin Knauf >>>>>>> >>>>>>> https://twitter.com/snntrable >>>>>>> >>>>>>> https://github.com/knaufk >>>>>>> >>>>>>