Hi Stephan, Thanks for the input. Just a few more clarifications / questions.
*Num Bytes / Records Metrics* 1. At this point, the *numRecordsIn(Rate)* metrics exist in both OperatorIOMetricGroup and TaskIOMetricGroup. I did not find *numRecordsIn(Rate)* in the TaskIOMetricGroup updated anywhere other than in the unit tests. Am I missing something? 2. *numBytesIn(Rate)* metrics only exist in TaskIOMetricGroup. At this point, the SourceReaders only has access to a SourceReaderContext which provides an OperatorMetricGroup. So it seems that the connector developers are not able to update the *numBytesIn(Rate). *With the multiple Source chaining support, it is possible that there are multiple Sources are in the same task. So it looks that we need to add *numBytesIn(Rate)* to the operator metrics as well. *Current (Fetch) Latency* *currentFetchLatency* helps clearly tell whether the latency is caused by Flink or not. Backpressure is not the only reason that we see fetch latency. Even if there is no back pressure, the records may have passed a long pipeline before they entered Flink. For example, say the *currentLatency *is 10 seconds and there is no backpressure. Does that mean the record spent 10 seconds in the Source operator? If not, how much did Flink contribute to that 10 seconds of latency? These questions are frequently asked and hard to tell without the fetch latency. For "currentFetchLatency", we would need to understand timestamps before > the records are decoded. That is only possible for some sources, where the > client gives us the records in a (partially) decoded from already (like > Kafka). Then, some work has been done between the fetch time and the time > we update the metric already, so it is already a bit closer to the > "currentFetchLatency". I think following this train of thought, there is > diminished benefit from that specific metric. We may not have to report the fetch latency before records are decoded. One solution is to remember the* FetchTime* when the encoded records are fetched, and report the fetch latency after the records are decoded by computing (*FetchTime - EventTime*). An approximate implementation would be adding a *FetchTime *field to the *RecordsWithSplitIds* assuming that all the records in that data structure are fetched at the same time. Thoughts? Thanks, Jiangjie (Becket) Qin On Wed, Sep 9, 2020 at 12:42 AM Stephan Ewen <step...@ververica.com> wrote: > Thanks for reviving this, Becket! > > I think Konstantin's comments are great. I'd add these points: > > *Num Bytes / Records Metrics* > > For "numBytesIn" and "numRecordsIn", we should reuse the OperatorIOMetric > group, then it also gets reported to the overview page in the Web UI. > > The "numBytesInPerSecond" and "numRecordsInPerSecond" are automatically > derived metrics, no need to do anything once we populate the above two > metrics > > > *Current (Fetch) Latency* > > I would really go for "eventTimeLag" rather than "fetchLatency". I think > "eventTimeLag" is a term that has some adoption in the Flink community and > beyond. > > I am not so sure that I see the benefit between "currentLatency" and > "currentFetchLatency", (or event time lag before/after) as this only is > different by the time it takes to emit a batch. > - In a non-backpressured case, these should be virtually identical > (and both dominated by watermark lag, not the actual time it takes the > fetch to be emitted) > - In a backpressured case, why do you care about when data was > fetched, as opposed to emitted? Emitted time is relevant for application > semantics and checkpoints. Fetch time seems to be an implementation detail > (how much does the source buffer). > > The "currentLatency" (eventTimeLagAfter) can be computed out-of-the-box, > independent of a source implementation, so that is also a good argument to > make it the main metric. > We know timestamps and watermarks in the source. Except for cases where no > watermarks have been defined at all (batch jobs or pure processing time > jobs), in which case this metric should probably be "Infinite". > > For "currentFetchLatency", we would need to understand timestamps before > the records are decoded. That is only possible for some sources, where the > client gives us the records in a (partially) decoded from already (like > Kafka). Then, some work has been done between the fetch time and the time > we update the metric already, so it is already a bit closer to the > "currentFetchLatency". I think following this train of thought, there is > diminished benefit from that specific metric. > > > *Idle Time* > > I agree, it would be great to rename this. Maybe to "sourceWaitTime" or > "sourceIdleTime" so to make clear that this is not exactly the time that > Flink's processing pipeline is idle, but the time where the source does not > have new data. > > This is not an easy metric to collect, though (except maybe for the > sources that are only idle while they have no split assigned, like > continuous file source). > > *Source Specific Metrics* > > I believe source-specific would only be "sourceIdleTime", > "numRecordsInErrors", "pendingBytes", and "pendingRecords". > > > *Conclusion* > > We can probably add "numBytesIn" and "numRecordsIn" and "eventTimeLag" > right away, with little complexity. > I'd suggest to start with these right away. > > Best, > Stephan > > > On Tue, Sep 8, 2020 at 3:25 PM Becket Qin <becket....@gmail.com> wrote: > >> Hey Konstantin, >> >> Thanks for the feedback and suggestions. Please see the reply below. >> >> * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has >>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>> They >>> have a similar name, but different definitions of idleness, >>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>> backpressured. Can we make it clearer that these two metrics mean >>> different >>> things? >> >> >> That is a good point. I did not notice this metric earlier. It seems that >> both metrics are useful to the users. One tells them how busy the source is >> and how much more throughput the source can handle. The other tells the >> users how long since the source has seen the last record, which is useful >> for debugging. I'll update the FLIP to make it clear. >> >> * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)" >>> is more descriptive/clear. What do others think? >> >> >> I am quite open to the ideas on these names. In fact I also feel >> "current(Fetch)Latency" are not super intuitive. So it would be great if we >> can have better names. >> >> * Current(Fetch)Latency implies that the timestamps are directly >>> extracted in the source connector, right? Will this be the default for >>> FLIP-27 sources anyway? >> >> >> The "currentFetchLatency" will probably be reported by each source >> implementation, because the data fetching is done by SplitReaders and there >> is no base implementation. The "currentLatency", on the other hand, can be >> reported by the SourceReader base implementation. However, since developers >> can actually implement their own source connector without using our base >> implementation, these metric guidance are still useful for the connector >> developers in that case. >> >> * Does FLIP-33 also include the implementation of these metrics (to the >>> extent possible) for all connectors currently available in Apache Flink >>> or >>> is the "per-connector implementation" out-of-scope? >> >> >> FLIP-33 itself does not specify any implementation of those metrics. But >> the connectors we provide in Apache Flink will follow the guidance of >> FLIP-33 to emit those metrics when applicable. Maybe We can have some >> public static Strings defined for the metric names to help other connector >> developers follow the same guidance. I can also add that to the public >> interface section of the FLIP if we decide to do that. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Sep 8, 2020 at 9:02 PM Becket Qin <becket....@gmail.com> wrote: >> >>> >>> >>> On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf <kna...@apache.org> >>> wrote: >>> >>>> Hi Becket, >>>> >>>> Thank you for picking up this FLIP. I have a few questions: >>>> >>>> * two thoughts on naming: >>>> * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" >>>> has >>>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. >>>> They >>>> have a similar name, but different definitions of idleness, >>>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is >>>> backpressured. Can we make it clearer that these two metrics mean >>>> different >>>> things? >>>> >>>> * "current(Fetch)Latency" I am wondering if >>>> "eventTimeLag(Before|After)" >>>> is more descriptive/clear. What do others think? >>>> >>>> * Current(Fetch)Latency implies that the timestamps are directly >>>> extracted in the source connector, right? Will this be the default for >>>> FLIP-27 sources anyway? >>>> >>>> * Does FLIP-33 also include the implementation of these metrics (to the >>>> extent possible) for all connectors currently available in Apache Flink >>>> or >>>> is the "per-connector implementation" out-of-scope? >>>> >>>> Thanks, >>>> >>>> Konstantin >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin <becket....@gmail.com> wrote: >>>> >>>> > Hi all, >>>> > >>>> > To complete the Source refactoring work, I'd like to revive this >>>> > discussion. Since the mail thread has been dormant for more than a >>>> year, >>>> > just to recap the motivation of the FLIP: >>>> > >>>> > 1. The FLIP proposes to standardize the connector metrics by giving >>>> > guidance on the metric specifications, including the name, type and >>>> meaning >>>> > of the metrics. >>>> > 2. It is OK for a connector to not emit some of the metrics in the >>>> metric >>>> > guidance, but if a metric of the same semantic is emitted, the >>>> > implementation should follow the guidance. >>>> > 3. It is OK for a connector to emit more metrics than what are listed >>>> in >>>> > the FLIP. This includes having an alias for a metric specified in the >>>> FLIP. >>>> > 4. We will implement some of the metrics out of the box in the default >>>> > implementation of FLIP-27, as long as it is applicable. >>>> > >>>> > The FLIP wiki is following: >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33 >>>> > %3A+Standardize+Connector+Metrics >>>> > >>>> > Any thoughts? >>>> > >>>> > Thanks, >>>> > >>>> > Jiangjie (Becket) Qin >>>> > >>>> > >>>> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski <pi...@ververica.com> >>>> > wrote: >>>> > >>>> > > > we will need to revisit the convention list and adjust them >>>> accordingly >>>> > > when FLIP-27 is ready >>>> > > >>>> > > >>>> > > Yes, this sounds good :) >>>> > > >>>> > > Piotrek >>>> > > >>>> > > > On 13 Jun 2019, at 13:35, Becket Qin <becket....@gmail.com> >>>> wrote: >>>> > > > >>>> > > > Hi Piotr, >>>> > > > >>>> > > > That's great to know. Chances are that we will need to revisit the >>>> > > > convention list and adjust them accordingly when FLIP-27 is >>>> ready, At >>>> > > that >>>> > > > point we can mark some of the metrics as available by default for >>>> > > > connectors implementing the new interface. >>>> > > > >>>> > > > Thanks, >>>> > > > >>>> > > > Jiangjie (Becket) Qin >>>> > > > >>>> > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski < >>>> pi...@ververica.com> >>>> > > wrote: >>>> > > > >>>> > > >> Thanks for driving this. I’ve just noticed one small thing. With >>>> new >>>> > > >> SourceReader interface Flink will be able to provide `idleTime` >>>> metric >>>> > > >> automatically. >>>> > > >> >>>> > > >> Piotrek >>>> > > >> >>>> > > >>> On 13 Jun 2019, at 03:30, Becket Qin <becket....@gmail.com> >>>> wrote: >>>> > > >>> >>>> > > >>> Thanks all for the feedback and discussion. >>>> > > >>> >>>> > > >>> Since there wasn't any concern raised, I've started the voting >>>> thread >>>> > > for >>>> > > >>> this FLIP, but please feel free to continue the discussion here >>>> if >>>> > you >>>> > > >>> think something still needs to be addressed. >>>> > > >>> >>>> > > >>> Thanks, >>>> > > >>> >>>> > > >>> Jiangjie (Becket) Qin >>>> > > >>> >>>> > > >>> >>>> > > >>> >>>> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin < >>>> becket....@gmail.com> >>>> > > wrote: >>>> > > >>> >>>> > > >>>> Hi Piotr, >>>> > > >>>> >>>> > > >>>> Thanks for the comments. Yes, you are right. Users will have >>>> to look >>>> > > at >>>> > > >>>> other metrics to decide whether the pipeline is healthy or not >>>> in >>>> > the >>>> > > >> first >>>> > > >>>> place before they can use the time-based metric to fix the >>>> > bottleneck. >>>> > > >>>> >>>> > > >>>> I agree that once we have FLIP-27 ready, some of the metrics >>>> can >>>> > just >>>> > > be >>>> > > >>>> reported by the abstract implementation. >>>> > > >>>> >>>> > > >>>> I've updated FLIP-33 wiki page to add the pendingBytes and >>>> > > >> pendingRecords >>>> > > >>>> metric. Please let me know if you have any concern over the >>>> updated >>>> > > >> metric >>>> > > >>>> convention proposal. >>>> > > >>>> >>>> > > >>>> @Chesnay Schepler <ches...@apache.org> @Stephan Ewen >>>> > > >>>> <step...@ververica.com> will you also have time to take a >>>> look at >>>> > the >>>> > > >>>> proposed metric convention? If there is no further concern I'll >>>> > start >>>> > > a >>>> > > >>>> voting thread for this FLIP in two days. >>>> > > >>>> >>>> > > >>>> Thanks, >>>> > > >>>> >>>> > > >>>> Jiangjie (Becket) Qin >>>> > > >>>> >>>> > > >>>> >>>> > > >>>> >>>> > > >>>> On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski < >>>> pi...@ververica.com> >>>> > > >> wrote: >>>> > > >>>> >>>> > > >>>>> Hi Becket, >>>> > > >>>>> >>>> > > >>>>> Thanks for the answer :) >>>> > > >>>>> >>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>> > producing >>>> > > >> the >>>> > > >>>>>> record to downstream. For example, a source connector can >>>> report >>>> > > that >>>> > > >>>>> it's >>>> > > >>>>>> spending 80% of time to emit record to downstream processing >>>> > > pipeline. >>>> > > >>>>> In >>>> > > >>>>>> another case, a sink connector may report that its spending >>>> 30% of >>>> > > >> time >>>> > > >>>>>> producing the records to the external system. >>>> > > >>>>>> >>>> > > >>>>>> This is in some sense equivalent to the buffer usage metric: >>>> > > >>>>> >>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>> > > downstream >>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>> > > downstream >>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>> full. >>>> > > >>>>> >>>> > > >>>>> If by “time spent on emitting records to downstream” you >>>> understand >>>> > > >>>>> “waiting on back pressure”, then I see your point. And I >>>> agree that >>>> > > >> some >>>> > > >>>>> kind of ratio/time based metric gives you more information. >>>> However >>>> > > >> under >>>> > > >>>>> “time spent on emitting records to downstream” might be >>>> hidden the >>>> > > >>>>> following (extreme) situation: >>>> > > >>>>> >>>> > > >>>>> 1. Job is barely able to handle influx of records, there is >>>> 99% >>>> > > >>>>> CPU/resource usage in the cluster, but nobody is >>>> > > >>>>> bottlenecked/backpressured, all output buffers are empty, >>>> everybody >>>> > > is >>>> > > >>>>> waiting in 1% of it’s time for more records to process. >>>> > > >>>>> 2. 80% time can still be spent on "down stream operators”, >>>> because >>>> > > they >>>> > > >>>>> are the CPU intensive operations, but this doesn’t mean that >>>> > > >> increasing the >>>> > > >>>>> parallelism down the stream will help with anything there. To >>>> the >>>> > > >> contrary, >>>> > > >>>>> increasing parallelism of the source operator might help to >>>> > increase >>>> > > >>>>> resource utilisation up to 100%. >>>> > > >>>>> >>>> > > >>>>> However, this “time based/ratio” approach can be extended to >>>> > > in/output >>>> > > >>>>> buffer usage. Besides collecting an information that >>>> input/output >>>> > > >> buffer is >>>> > > >>>>> full/empty, we can probe profile how often are buffer >>>> empty/full. >>>> > If >>>> > > >> output >>>> > > >>>>> buffer is full 1% of times, there is almost no back pressure. >>>> If >>>> > it’s >>>> > > >> full >>>> > > >>>>> 80% of times, there is some back pressure, if it’s full 99.9% >>>> of >>>> > > times, >>>> > > >>>>> there is huge back pressure. >>>> > > >>>>> >>>> > > >>>>> Now for autoscaling you could compare the input & output >>>> buffers >>>> > fill >>>> > > >>>>> ratio: >>>> > > >>>>> >>>> > > >>>>> 1. Both are high, the source of bottleneck is down the stream >>>> > > >>>>> 2. Output is low, input is high, this is the bottleneck and >>>> the >>>> > > higher >>>> > > >>>>> the difference, the bigger source of bottleneck is this is >>>> > > >> operator/task >>>> > > >>>>> 3. Output is high, input is low - there was some load spike >>>> that we >>>> > > are >>>> > > >>>>> currently finishing to process >>>> > > >>>>> >>>> > > >>>>> >>>> > > >>>>> >>>> > > >>>>> But long story short, we are probably diverging from the >>>> topic of >>>> > > this >>>> > > >>>>> discussion, and we can discuss this at some later point. >>>> > > >>>>> >>>> > > >>>>> For now, for sources: >>>> > > >>>>> >>>> > > >>>>> as I wrote before, +1 for: >>>> > > >>>>> - pending.bytes, Gauge >>>> > > >>>>> - pending.messages, Gauge >>>> > > >>>>> >>>> > > >>>>> When we will be developing/discussing SourceReader from >>>> FLIP-27 we >>>> > > >> might >>>> > > >>>>> then add: >>>> > > >>>>> >>>> > > >>>>> - in-memory.buffer.usage (0 - 100%) >>>> > > >>>>> >>>> > > >>>>> Which will be estimated automatically by Flink while user >>>> will be >>>> > > able >>>> > > >> to >>>> > > >>>>> override/provide better estimation. >>>> > > >>>>> >>>> > > >>>>> Piotrek >>>> > > >>>>> >>>> > > >>>>>> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> >>>> wrote: >>>> > > >>>>>> >>>> > > >>>>>> Hi Piotr, >>>> > > >>>>>> >>>> > > >>>>>> Thanks for the explanation. Please see some clarifications >>>> below. >>>> > > >>>>>> >>>> > > >>>>>> By time-based metric, I meant the portion of time spent on >>>> > producing >>>> > > >> the >>>> > > >>>>>> record to downstream. For example, a source connector can >>>> report >>>> > > that >>>> > > >>>>> it's >>>> > > >>>>>> spending 80% of time to emit record to downstream processing >>>> > > pipeline. >>>> > > >>>>> In >>>> > > >>>>>> another case, a sink connector may report that its spending >>>> 30% of >>>> > > >> time >>>> > > >>>>>> producing the records to the external system. >>>> > > >>>>>> >>>> > > >>>>>> This is in some sense equivalent to the buffer usage metric: >>>> > > >>>>>> - 80% of time spent on emitting records to downstream ---> >>>> > > downstream >>>> > > >>>>>> node is bottleneck ---> output buffer is probably full. >>>> > > >>>>>> - 30% of time spent on emitting records to downstream ---> >>>> > > downstream >>>> > > >>>>>> node is not bottleneck ---> output buffer is probably not >>>> full. >>>> > > >>>>>> >>>> > > >>>>>> However, the time-based metric has a few advantages that the >>>> > buffer >>>> > > >>>>> usage >>>> > > >>>>>> metric may not have. >>>> > > >>>>>> >>>> > > >>>>>> 1. Buffer usage metric may not be applicable to all the >>>> connector >>>> > > >>>>>> implementations, while reporting time-based metric are always >>>> > > doable. >>>> > > >>>>>> Some source connectors may not have any input buffer, or >>>> they may >>>> > > use >>>> > > >>>>> some >>>> > > >>>>>> third party library that does not expose the input buffer at >>>> all. >>>> > > >>>>>> Similarly, for sink connectors, the implementation may not >>>> have >>>> > any >>>> > > >>>>> output >>>> > > >>>>>> buffer, or the third party library does not expose such >>>> buffer. >>>> > > >>>>>> >>>> > > >>>>>> 2. Although both type of metrics can detect bottleneck, >>>> time-based >>>> > > >>>>> metrics >>>> > > >>>>>> can be used to generate a more informed action to remove the >>>> > > >> bottleneck. >>>> > > >>>>>> For example, when the downstream is bottleneck, the output >>>> buffer >>>> > > >> usage >>>> > > >>>>>> metric is likely to be 100%, and the input buffer usage >>>> might be >>>> > 0%. >>>> > > >>>>> That >>>> > > >>>>>> means we don't know what is the suitable parallelism to lift >>>> the >>>> > > >>>>>> bottleneck. The time-based metric, on the other hand, would >>>> give >>>> > > >> useful >>>> > > >>>>>> information, e.g. if 80% of time was spent on emitting >>>> records, we >>>> > > can >>>> > > >>>>>> roughly increase the downstream node parallelism by 4 times. >>>> > > >>>>>> >>>> > > >>>>>> Admittedly, the time-based metrics are more expensive than >>>> buffer >>>> > > >>>>> usage. So >>>> > > >>>>>> we may have to do some sampling to reduce the cost. But in >>>> > general, >>>> > > >>>>> using >>>> > > >>>>>> time-based metrics seems worth adding. >>>> > > >>>>>> >>>> > > >>>>>> That being said, I don't think buffer usage metric and >>>> time-based >>>> > > >>>>> metrics >>>> > > >>>>>> are mutually exclusive. We can probably have both. It is >>>> just that >>>> > > in >>>> > > >>>>>> practice, features like auto-scaling might prefer time-based >>>> > metrics >>>> > > >> for >>>> > > >>>>>> the reason stated above. >>>> > > >>>>>> >>>> > > >>>>>>> 1. Define the metrics that would allow us to manually detect >>>> > > >>>>> bottlenecks. >>>> > > >>>>>> As I wrote, we already have them in most of the places, >>>> except of >>>> > > >>>>>> sources/sinks. >>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. >>>> > > Currently >>>> > > >> we >>>> > > >>>>>> are only automatically detecting back pressure and reporting >>>> it to >>>> > > the >>>> > > >>>>> user >>>> > > >>>>>> in web UI (is it exposed as a metric at all?). Detecting the >>>> root >>>> > > >> cause >>>> > > >>>>> of >>>> > > >>>>>> the back pressure (bottleneck) is one step further. >>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is >>>> > located, >>>> > > >> to >>>> > > >>>>>> try to do something with it. >>>> > > >>>>>> >>>> > > >>>>>> As explained above, I think time-based metric also addresses >>>> item >>>> > 1 >>>> > > >> and >>>> > > >>>>>> item 2. >>>> > > >>>>>> >>>> > > >>>>>> Any thoughts? >>>> > > >>>>>> >>>> > > >>>>>> Thanks, >>>> > > >>>>>> >>>> > > >>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>> >>>> > > >>>>>> >>>> > > >>>>>> >>>> > > >>>>>> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski < >>>> > pi...@ververica.com> >>>> > > >>>>> wrote: >>>> > > >>>>>> >>>> > > >>>>>>> Hi again :) >>>> > > >>>>>>> >>>> > > >>>>>>>> - pending.bytes, Gauge >>>> > > >>>>>>>> - pending.messages, Gauge >>>> > > >>>>>>> >>>> > > >>>>>>> >>>> > > >>>>>>> +1 >>>> > > >>>>>>> >>>> > > >>>>>>> And true, instead of overloading one of the metric it is >>>> better >>>> > > when >>>> > > >>>>> user >>>> > > >>>>>>> can choose to provide only one of them. >>>> > > >>>>>>> >>>> > > >>>>>>> Re 2: >>>> > > >>>>>>> >>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>> pending >>>> > > >> mesages >>>> > > >>>>> / >>>> > > >>>>>>>> bytes would answer the questions of: >>>> > > >>>>>>> >>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind + >>>> empty >>>> > > >>>>> buffer >>>> > > >>>>>>> = >>>> > > >>>>>>>> cannot consume fast enough. >>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>> full >>>> > > buffer >>>> > > >> = >>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>> > > >>>>>>> >>>> > > >>>>>>> Yes, exactly. This can also be used to support decisions >>>> like >>>> > > >> changing >>>> > > >>>>> the >>>> > > >>>>>>> parallelism of the sources and/or down stream operators. >>>> > > >>>>>>> >>>> > > >>>>>>> I’m not sure if I understand your proposal with time based >>>> > > >>>>> measurements. >>>> > > >>>>>>> Maybe I’m missing something, but I do not see how measuring >>>> time >>>> > > >> alone >>>> > > >>>>>>> could answer the problem: where is the bottleneck. Time >>>> spent on >>>> > > the >>>> > > >>>>>>> next/emit might be short or long (depending on how heavy to >>>> > process >>>> > > >> the >>>> > > >>>>>>> record is) and the source can still be bottlenecked/back >>>> > pressured >>>> > > or >>>> > > >>>>> not. >>>> > > >>>>>>> Usually the easiest and the most reliable way how to detect >>>> > > >>>>> bottlenecks is >>>> > > >>>>>>> by checking usage of input & output buffers, since when >>>> input >>>> > > buffer >>>> > > >> is >>>> > > >>>>>>> full while output buffer is empty, that’s the definition of >>>> a >>>> > > >>>>> bottleneck. >>>> > > >>>>>>> Also this is usually very easy and cheap to measure (it >>>> works >>>> > > >>>>> effectively >>>> > > >>>>>>> the same way as current’s Flink back pressure monitoring, >>>> but >>>> > more >>>> > > >>>>> cleanly, >>>> > > >>>>>>> without probing thread’s stack traces). >>>> > > >>>>>>> >>>> > > >>>>>>> Also keep in mind that we are already using the buffer usage >>>> > > metrics >>>> > > >>>>> for >>>> > > >>>>>>> detecting the bottlenecks in Flink’s internal network >>>> exchanges >>>> > > >> (manual >>>> > > >>>>>>> work). That’s the reason why I wanted to extend this to >>>> > > >> sources/sinks, >>>> > > >>>>>>> since they are currently our blind spot. >>>> > > >>>>>>> >>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>> > > automatically >>>> > > >>>>>>> relies >>>> > > >>>>>>>> on some metrics answering the same question >>>> > > >>>>>>> >>>> > > >>>>>>> That would be very helpful feature. I think in order to >>>> achieve >>>> > > that >>>> > > >> we >>>> > > >>>>>>> would need to: >>>> > > >>>>>>> 1. Define the metrics that would allow us to manually detect >>>> > > >>>>> bottlenecks. >>>> > > >>>>>>> As I wrote, we already have them in most of the places, >>>> except of >>>> > > >>>>>>> sources/sinks. >>>> > > >>>>>>> 2. Use those metrics, to automatically detect bottlenecks. >>>> > > Currently >>>> > > >> we >>>> > > >>>>>>> are only automatically detecting back pressure and >>>> reporting it >>>> > to >>>> > > >> the >>>> > > >>>>> user >>>> > > >>>>>>> in web UI (is it exposed as a metric at all?). Detecting >>>> the root >>>> > > >>>>> cause of >>>> > > >>>>>>> the back pressure (bottleneck) is one step further. >>>> > > >>>>>>> 3. Use the knowledge about where exactly the bottleneck is >>>> > located, >>>> > > >> to >>>> > > >>>>> try >>>> > > >>>>>>> to do something with it. >>>> > > >>>>>>> >>>> > > >>>>>>> I think you are aiming for point 3., but before we reach >>>> it, we >>>> > are >>>> > > >>>>> still >>>> > > >>>>>>> missing 1. & 2. Also even if we have 3., there is a value >>>> in 1 & >>>> > 2 >>>> > > >> for >>>> > > >>>>>>> manual analysis/dashboards. >>>> > > >>>>>>> >>>> > > >>>>>>> However, having the knowledge of where the bottleneck is, >>>> doesn’t >>>> > > >>>>>>> necessarily mean that we know what we can do about it. For >>>> > example >>>> > > >>>>>>> increasing parallelism might or might not help with anything >>>> > (data >>>> > > >>>>> skew, >>>> > > >>>>>>> bottleneck on some resource that does not scale), but this >>>> remark >>>> > > >>>>> applies >>>> > > >>>>>>> always, regardless of the way how did we detect the >>>> bottleneck. >>>> > > >>>>>>> >>>> > > >>>>>>> Piotrek >>>> > > >>>>>>> >>>> > > >>>>>>>> On 3 Jun 2019, at 06:16, Becket Qin <becket....@gmail.com> >>>> > wrote: >>>> > > >>>>>>>> >>>> > > >>>>>>>> Hi Piotr, >>>> > > >>>>>>>> >>>> > > >>>>>>>> Thanks for the suggestion. Some thoughts below: >>>> > > >>>>>>>> >>>> > > >>>>>>>> Re 1: The pending messages / bytes. >>>> > > >>>>>>>> I completely agree these are very useful metrics and we >>>> should >>>> > > >> expect >>>> > > >>>>> the >>>> > > >>>>>>>> connector to report. WRT the way to expose them, it seems >>>> more >>>> > > >>>>> consistent >>>> > > >>>>>>>> to add two metrics instead of adding a method (unless >>>> there are >>>> > > >> other >>>> > > >>>>> use >>>> > > >>>>>>>> cases other than metric reporting). So we can add the >>>> following >>>> > > two >>>> > > >>>>>>> metrics. >>>> > > >>>>>>>> - pending.bytes, Gauge >>>> > > >>>>>>>> - pending.messages, Gauge >>>> > > >>>>>>>> Applicable connectors can choose to report them. These two >>>> > metrics >>>> > > >>>>> along >>>> > > >>>>>>>> with latency should be sufficient for users to understand >>>> the >>>> > > >> progress >>>> > > >>>>>>> of a >>>> > > >>>>>>>> connector. >>>> > > >>>>>>>> >>>> > > >>>>>>>> >>>> > > >>>>>>>> Re 2: Number of buffered data in-memory of the connector >>>> > > >>>>>>>> If I understand correctly, this metric along with the >>>> pending >>>> > > >> mesages >>>> > > >>>>> / >>>> > > >>>>>>>> bytes would answer the questions of: >>>> > > >>>>>>>> - Does the connector consume fast enough? Lagging behind + >>>> empty >>>> > > >>>>> buffer >>>> > > >>>>>>> = >>>> > > >>>>>>>> cannot consume fast enough. >>>> > > >>>>>>>> - Does the connector emit fast enough? Lagging behind + >>>> full >>>> > > buffer >>>> > > >> = >>>> > > >>>>>>>> cannot emit fast enough, i.e. the Flink pipeline is slow. >>>> > > >>>>>>>> >>>> > > >>>>>>>> One feature we are currently working on to scale Flink >>>> > > automatically >>>> > > >>>>>>> relies >>>> > > >>>>>>>> on some metrics answering the same question, more >>>> specifically, >>>> > we >>>> > > >> are >>>> > > >>>>>>>> profiling the time spent on .next() method (time to >>>> consume) and >>>> > > the >>>> > > >>>>> time >>>> > > >>>>>>>> spent on .collect() method (time to emit / process). One >>>> > advantage >>>> > > >> of >>>> > > >>>>>>> such >>>> > > >>>>>>>> method level time cost allows us to calculate the >>>> parallelism >>>> > > >>>>> required to >>>> > > >>>>>>>> keep up in case their is a lag. >>>> > > >>>>>>>> >>>> > > >>>>>>>> However, one concern I have regarding such metric is that >>>> they >>>> > are >>>> > > >>>>>>>> implementation specific. Either profiling on the method >>>> time, or >>>> > > >>>>>>> reporting >>>> > > >>>>>>>> buffer usage assumes the connector are implemented in such >>>> a >>>> > way. >>>> > > A >>>> > > >>>>>>>> slightly better solution might be have the following >>>> metric: >>>> > > >>>>>>>> >>>> > > >>>>>>>> - EmitTimeRatio (or FetchTimeRatio): The time spent on >>>> emitting >>>> > > >>>>>>>> records / Total time elapsed. >>>> > > >>>>>>>> >>>> > > >>>>>>>> This assumes that the source connectors have to emit the >>>> records >>>> > > to >>>> > > >>>>> the >>>> > > >>>>>>>> downstream at some point. The emission may take some time >>>> ( e.g. >>>> > > go >>>> > > >>>>>>> through >>>> > > >>>>>>>> chained operators). And the rest of the time are spent to >>>> > prepare >>>> > > >> the >>>> > > >>>>>>>> record to emit, including time for consuming and format >>>> > > conversion, >>>> > > >>>>> etc. >>>> > > >>>>>>>> Ideally, we'd like to see the time spent on record fetch >>>> and >>>> > emit >>>> > > to >>>> > > >>>>> be >>>> > > >>>>>>>> about the same, so no one is bottleneck for the other. >>>> > > >>>>>>>> >>>> > > >>>>>>>> The downside of these time based metrics is additional >>>> overhead >>>> > to >>>> > > >> get >>>> > > >>>>>>> the >>>> > > >>>>>>>> time, therefore sampling might be needed. But in practice >>>> I feel >>>> > > >> such >>>> > > >>>>>>> time >>>> > > >>>>>>>> based metric might be more useful if we want to take >>>> action. >>>> > > >>>>>>>> >>>> > > >>>>>>>> >>>> > > >>>>>>>> I think we should absolutely add metrics in (1) to the >>>> metric >>>> > > >>>>> convention. >>>> > > >>>>>>>> We could also add the metrics mentioned in (2) if we reach >>>> > > consensus >>>> > > >>>>> on >>>> > > >>>>>>>> that. What do you think? >>>> > > >>>>>>>> >>>> > > >>>>>>>> Thanks, >>>> > > >>>>>>>> >>>> > > >>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>> >>>> > > >>>>>>>> >>>> > > >>>>>>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski < >>>> > > pi...@ververica.com >>>> > > >>> >>>> > > >>>>>>> wrote: >>>> > > >>>>>>>> >>>> > > >>>>>>>>> Hey Becket, >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Re 1a) and 1b) +1 from my side. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> I’ve discussed this issue: >>>> > > >>>>>>>>> >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us to >>>> check >>>> > > the >>>> > > >>>>> cause >>>> > > >>>>>>>>> of >>>> > > >>>>>>>>>>>> back pressure: >>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>> boolean >>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>> boolean >>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> With Nico at some lengths and he also saw the benefits of >>>> them. >>>> > > We >>>> > > >>>>> also >>>> > > >>>>>>>>> have more concrete proposal for that. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Actually there are two really useful metrics, that we are >>>> > missing >>>> > > >>>>>>>>> currently: >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> 1. Number of data/records/bytes in the backlog to >>>> process. For >>>> > > >>>>> example >>>> > > >>>>>>>>> remaining number of bytes in unread files. Or pending >>>> data in >>>> > > Kafka >>>> > > >>>>>>> topics. >>>> > > >>>>>>>>> 2. Number of buffered data in-memory of the connector, >>>> that are >>>> > > >>>>> waiting >>>> > > >>>>>>> to >>>> > > >>>>>>>>> be processed pushed to Flink pipeline. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Re 1: >>>> > > >>>>>>>>> This would have to be a metric provided directly by a >>>> > connector. >>>> > > It >>>> > > >>>>>>> could >>>> > > >>>>>>>>> be an undefined `int`: >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> `int backlog` - estimate of pending work. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> “Undefined” meaning that it would be up to a connector to >>>> > decided >>>> > > >>>>>>> whether >>>> > > >>>>>>>>> it’s measured in bytes, records, pending files or >>>> whatever it >>>> > is >>>> > > >>>>>>> possible >>>> > > >>>>>>>>> to provide by the connector. This is because I assume not >>>> every >>>> > > >>>>>>> connector >>>> > > >>>>>>>>> can provide exact number and for some of them it might be >>>> > > >> impossible >>>> > > >>>>> to >>>> > > >>>>>>>>> provide records number of bytes count. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Re 2: >>>> > > >>>>>>>>> This metric could be either provided by a connector, or >>>> > > calculated >>>> > > >>>>>>> crudely >>>> > > >>>>>>>>> by Flink: >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> `float bufferUsage` - value from [0.0, 1.0] range >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Percentage of used in memory buffers, like in Kafka’s >>>> handover. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> It could be crudely implemented by Flink with FLIP-27 >>>> > > >>>>>>>>> SourceReader#isAvailable. If SourceReader is not available >>>> > > reported >>>> > > >>>>>>>>> `bufferUsage` could be 0.0. If it is available, it could >>>> be >>>> > 1.0. >>>> > > I >>>> > > >>>>> think >>>> > > >>>>>>>>> this would be a good enough estimation for most of the use >>>> > cases >>>> > > >>>>> (that >>>> > > >>>>>>>>> could be overloaded and implemented better if desired). >>>> > > Especially >>>> > > >>>>>>> since we >>>> > > >>>>>>>>> are reporting only probed values: if probed values are >>>> almost >>>> > > >> always >>>> > > >>>>>>> “1.0”, >>>> > > >>>>>>>>> it would mean that we have a back pressure. If they are >>>> almost >>>> > > >> always >>>> > > >>>>>>>>> “0.0”, there is probably no back pressure at the sources. >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> What do you think about this? >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> Piotrek >>>> > > >>>>>>>>> >>>> > > >>>>>>>>>> On 30 May 2019, at 11:41, Becket Qin < >>>> becket....@gmail.com> >>>> > > >> wrote: >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> Hi all, >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> Thanks a lot for all the feedback and comments. I'd like >>>> to >>>> > > >> continue >>>> > > >>>>>>> the >>>> > > >>>>>>>>>> discussion on this FLIP. >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> I updated the FLIP-33 wiki to remove all the histogram >>>> metrics >>>> > > >> from >>>> > > >>>>> the >>>> > > >>>>>>>>>> first version of metric convention due to the performance >>>> > > concern. >>>> > > >>>>> The >>>> > > >>>>>>>>> plan >>>> > > >>>>>>>>>> is to introduce them later when we have a mechanism to >>>> opt >>>> > > in/out >>>> > > >>>>>>>>> metrics. >>>> > > >>>>>>>>>> At that point, users can decide whether they want to pay >>>> the >>>> > > cost >>>> > > >> to >>>> > > >>>>>>> get >>>> > > >>>>>>>>>> the metric or not. >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> As Stephan suggested, for this FLIP, let's first try to >>>> agree >>>> > on >>>> > > >> the >>>> > > >>>>>>>>> small >>>> > > >>>>>>>>>> list of conventional metrics that connectors should >>>> follow. >>>> > > >>>>>>>>>> Just to be clear, the purpose of the convention is not to >>>> > > enforce >>>> > > >>>>> every >>>> > > >>>>>>>>>> connector to report all these metrics, but to provide a >>>> > guidance >>>> > > >> in >>>> > > >>>>>>> case >>>> > > >>>>>>>>>> these metrics are reported by some connectors. >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> @ Stephan & Chesnay, >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> Regarding the duplication of `RecordsIn` metric in >>>> operator / >>>> > > task >>>> > > >>>>>>>>>> IOMetricGroups, from what I understand, for source >>>> operator, >>>> > it >>>> > > is >>>> > > >>>>>>>>> actually >>>> > > >>>>>>>>>> the SourceFunction that reports the operator level >>>> > > >>>>>>>>>> RecordsIn/RecordsInPerSecond metric. So they are >>>> essentially >>>> > the >>>> > > >>>>> same >>>> > > >>>>>>>>>> metric in the operator level IOMetricGroup. Similarly >>>> for the >>>> > > Sink >>>> > > >>>>>>>>>> operator, the operator level >>>> RecordsOut/RecordsOutPerSecond >>>> > > >> metrics >>>> > > >>>>> are >>>> > > >>>>>>>>>> also reported by the Sink function. I marked them as >>>> existing >>>> > in >>>> > > >> the >>>> > > >>>>>>>>>> FLIP-33 wiki page. Please let me know if I misunderstood. >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> Thanks, >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin < >>>> > > becket....@gmail.com> >>>> > > >>>>>>> wrote: >>>> > > >>>>>>>>>> >>>> > > >>>>>>>>>>> Hi Piotr, >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> Thanks a lot for the feedback. >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> 1a) I guess you are referring to the part that "original >>>> > system >>>> > > >>>>>>> specific >>>> > > >>>>>>>>>>> metrics should also be reported". The performance impact >>>> > > depends >>>> > > >> on >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>> implementation. An efficient implementation would only >>>> record >>>> > > the >>>> > > >>>>>>> metric >>>> > > >>>>>>>>>>> once, but report them with two different metric names. >>>> This >>>> > is >>>> > > >>>>>>> unlikely >>>> > > >>>>>>>>> to >>>> > > >>>>>>>>>>> hurt performance. >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> 1b) Yes, I agree that we should avoid adding overhead >>>> to the >>>> > > >>>>> critical >>>> > > >>>>>>>>> path >>>> > > >>>>>>>>>>> by all means. This is sometimes a tradeoff, running >>>> blindly >>>> > > >> without >>>> > > >>>>>>> any >>>> > > >>>>>>>>>>> metric gives best performance, but sometimes might be >>>> > > frustrating >>>> > > >>>>> when >>>> > > >>>>>>>>> we >>>> > > >>>>>>>>>>> debug some issues. >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> 2. The metrics are indeed very useful. Are they >>>> supposed to >>>> > be >>>> > > >>>>>>> reported >>>> > > >>>>>>>>> by >>>> > > >>>>>>>>>>> the connectors or Flink itself? At this point FLIP-33 >>>> is more >>>> > > >>>>> focused >>>> > > >>>>>>> on >>>> > > >>>>>>>>>>> provide a guidance to the connector authors on the >>>> metrics >>>> > > >>>>> reporting. >>>> > > >>>>>>>>> That >>>> > > >>>>>>>>>>> said, after FLIP-27, I think we should absolutely report >>>> > these >>>> > > >>>>> metrics >>>> > > >>>>>>>>> in >>>> > > >>>>>>>>>>> the abstract implementation. In any case, the metric >>>> > convention >>>> > > >> in >>>> > > >>>>>>> this >>>> > > >>>>>>>>>>> list are expected to evolve over time. >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski < >>>> > > >>>>> pi...@ververica.com> >>>> > > >>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>> >>>> > > >>>>>>>>>>>> Hi, >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> Thanks for the proposal and driving the effort here >>>> Becket >>>> > :) >>>> > > >> I’ve >>>> > > >>>>>>> read >>>> > > >>>>>>>>>>>> through the FLIP-33 [1], and here are couple of my >>>> thoughts. >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> Big +1 for standardising the metric names between >>>> > connectors, >>>> > > it >>>> > > >>>>> will >>>> > > >>>>>>>>>>>> definitely help us and users a lot. >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> Issues/questions/things to discuss that I’ve thought >>>> of: >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> 1a. If we are about to duplicate some metrics, can this >>>> > > become a >>>> > > >>>>>>>>>>>> performance issue? >>>> > > >>>>>>>>>>>> 1b. Generally speaking, we should make sure that >>>> collecting >>>> > > >> those >>>> > > >>>>>>>>> metrics >>>> > > >>>>>>>>>>>> is as non intrusive as possible, especially that they >>>> will >>>> > > need >>>> > > >>>>> to be >>>> > > >>>>>>>>>>>> updated once per record. (They might be collected more >>>> > rarely >>>> > > >> with >>>> > > >>>>>>> some >>>> > > >>>>>>>>>>>> overhead, but the hot path of updating it per record >>>> will >>>> > need >>>> > > >> to >>>> > > >>>>> be >>>> > > >>>>>>> as >>>> > > >>>>>>>>>>>> quick as possible). That includes both avoiding heavy >>>> > > >> computation >>>> > > >>>>> on >>>> > > >>>>>>>>> per >>>> > > >>>>>>>>>>>> record path: histograms?, measuring time for time based >>>> > > metrics >>>> > > >>>>> (per >>>> > > >>>>>>>>>>>> second) (System.currentTimeMillis() depending on the >>>> > > >>>>> implementation >>>> > > >>>>>>> can >>>> > > >>>>>>>>>>>> invoke a system call) >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> 2. It would be nice to have metrics, that allow us to >>>> check >>>> > > the >>>> > > >>>>> cause >>>> > > >>>>>>>>> of >>>> > > >>>>>>>>>>>> back pressure: >>>> > > >>>>>>>>>>>> a) for sources, length of input queue (in bytes? Or >>>> boolean >>>> > > >>>>>>>>>>>> hasSomethingl/isEmpty) >>>> > > >>>>>>>>>>>> b) for sinks, length of output queue (in bytes? Or >>>> boolean >>>> > > >>>>>>>>>>>> hasSomething/isEmpty) >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> a) is useful in a scenario when we are processing >>>> backlog of >>>> > > >>>>> records, >>>> > > >>>>>>>>> all >>>> > > >>>>>>>>>>>> of the internal Flink’s input/output network buffers >>>> are >>>> > > empty, >>>> > > >>>>> and >>>> > > >>>>>>> we >>>> > > >>>>>>>>> want >>>> > > >>>>>>>>>>>> to check whether the external source system is the >>>> > bottleneck >>>> > > >>>>>>> (source’s >>>> > > >>>>>>>>>>>> input queue will be empty), or if the Flink’s >>>> connector is >>>> > the >>>> > > >>>>>>>>> bottleneck >>>> > > >>>>>>>>>>>> (source’s input queues will be full). >>>> > > >>>>>>>>>>>> b) similar story. Backlog of records, but this time >>>> all of >>>> > the >>>> > > >>>>>>> internal >>>> > > >>>>>>>>>>>> Flink’s input/ouput network buffers are full, and we >>>> want o >>>> > > >> check >>>> > > >>>>>>>>> whether >>>> > > >>>>>>>>>>>> the external sink system is the bottleneck (sink output >>>> > queues >>>> > > >> are >>>> > > >>>>>>>>> full), >>>> > > >>>>>>>>>>>> or if the Flink’s connector is the bottleneck (sink’s >>>> output >>>> > > >>>>> queues >>>> > > >>>>>>>>> will be >>>> > > >>>>>>>>>>>> empty) >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> It might be sometimes difficult to provide those >>>> metrics, so >>>> > > >> they >>>> > > >>>>>>> could >>>> > > >>>>>>>>>>>> be optional, but if we could provide them, it would be >>>> > really >>>> > > >>>>>>> helpful. >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> Piotrek >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> [1] >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>> >>>> > > >>>>> >>>> > > >> >>>> > > >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>> > > >>>>>>>>>>>> < >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>> >>>> > > >>>>> >>>> > > >> >>>> > > >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen < >>>> se...@apache.org> >>>> > > >> wrote: >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> I think this sounds reasonable. >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> Let's keep the "reconfiguration without stopping the >>>> job" >>>> > out >>>> > > >> of >>>> > > >>>>>>> this, >>>> > > >>>>>>>>>>>>> because that would be a super big effort and if we >>>> approach >>>> > > >> that, >>>> > > >>>>>>> then >>>> > > >>>>>>>>>>>> in >>>> > > >>>>>>>>>>>>> more generic way rather than specific to connector >>>> metrics. >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> I would suggest to look at the following things before >>>> > > starting >>>> > > >>>>> with >>>> > > >>>>>>>>> any >>>> > > >>>>>>>>>>>>> implementation work: >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> - Try and find a committer to support this, otherwise >>>> it >>>> > will >>>> > > >> be >>>> > > >>>>>>> hard >>>> > > >>>>>>>>>>>> to >>>> > > >>>>>>>>>>>>> make progress >>>> > > >>>>>>>>>>>>> - Start with defining a smaller set of "core metrics" >>>> and >>>> > > >> extend >>>> > > >>>>> the >>>> > > >>>>>>>>>>>> set >>>> > > >>>>>>>>>>>>> later. I think that is easier than now blocking on >>>> reaching >>>> > > >>>>>>> consensus >>>> > > >>>>>>>>>>>> on a >>>> > > >>>>>>>>>>>>> large group of metrics. >>>> > > >>>>>>>>>>>>> - Find a solution to the problem Chesnay mentioned, >>>> that >>>> > the >>>> > > >>>>>>> "records >>>> > > >>>>>>>>>>>> in" >>>> > > >>>>>>>>>>>>> metric is somehow overloaded and exists already in >>>> the IO >>>> > > >> Metric >>>> > > >>>>>>>>> group. >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin < >>>> > > >> becket....@gmail.com >>>> > > >>>>>> >>>> > > >>>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> Hi Stephan, >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> Thanks a lot for the feedback. All makes sense. >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> It is a good suggestion to simply have an >>>> > onRecord(numBytes, >>>> > > >>>>>>>>> eventTime) >>>> > > >>>>>>>>>>>>>> method for connector writers. It should meet most of >>>> the >>>> > > >>>>>>>>> requirements, >>>> > > >>>>>>>>>>>>>> individual >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> The configurable metrics feature is something really >>>> > useful, >>>> > > >>>>>>>>>>>> especially if >>>> > > >>>>>>>>>>>>>> we can somehow make it dynamically configurable >>>> without >>>> > > >> stopping >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>>> jobs. >>>> > > >>>>>>>>>>>>>> It might be better to make it a separate discussion >>>> > because >>>> > > it >>>> > > >>>>> is a >>>> > > >>>>>>>>>>>> more >>>> > > >>>>>>>>>>>>>> generic feature instead of only for connectors. >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> So in order to make some progress, in this FLIP we >>>> can >>>> > limit >>>> > > >> the >>>> > > >>>>>>>>>>>> discussion >>>> > > >>>>>>>>>>>>>> scope to the connector related items: >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> - the standard connector metric names and types. >>>> > > >>>>>>>>>>>>>> - the abstract ConnectorMetricHandler interface >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> I'll start a separate thread to discuss other general >>>> > metric >>>> > > >>>>>>> related >>>> > > >>>>>>>>>>>>>> enhancement items including: >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> - optional metrics >>>> > > >>>>>>>>>>>>>> - dynamic metric configuration >>>> > > >>>>>>>>>>>>>> - potential combination with rate limiter >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> Does this plan sound reasonable? >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen < >>>> > > >> se...@apache.org> >>>> > > >>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> Ignoring for a moment implementation details, this >>>> > > connector >>>> > > >>>>>>> metrics >>>> > > >>>>>>>>>>>> work >>>> > > >>>>>>>>>>>>>>> is a really good thing to do, in my opinion >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> The questions "oh, my job seems to be doing >>>> nothing, I am >>>> > > >>>>> looking >>>> > > >>>>>>> at >>>> > > >>>>>>>>>>>> the >>>> > > >>>>>>>>>>>>>> UI >>>> > > >>>>>>>>>>>>>>> and the 'records in' value is still zero" is in the >>>> top >>>> > > three >>>> > > >>>>>>>>> support >>>> > > >>>>>>>>>>>>>>> questions I have been asked personally. >>>> > > >>>>>>>>>>>>>>> Introspection into "how far is the consumer lagging >>>> > behind" >>>> > > >>>>> (event >>>> > > >>>>>>>>>>>> time >>>> > > >>>>>>>>>>>>>>> fetch latency) came up many times as well. >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> So big +1 to solving this problem. >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> About the exact design - I would try to go for the >>>> > > following >>>> > > >>>>>>>>>>>> properties: >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> - keep complexity of of connectors. Ideally the >>>> metrics >>>> > > >> handler >>>> > > >>>>>>> has >>>> > > >>>>>>>>> a >>>> > > >>>>>>>>>>>>>>> single onRecord(numBytes, eventTime) method or so, >>>> and >>>> > > >>>>> everything >>>> > > >>>>>>>>>>>> else is >>>> > > >>>>>>>>>>>>>>> internal to the handler. That makes it dead simple >>>> for >>>> > the >>>> > > >>>>>>>>> connector. >>>> > > >>>>>>>>>>>> We >>>> > > >>>>>>>>>>>>>>> can also think of an extensive scheme for connector >>>> > > specific >>>> > > >>>>>>>>> metrics. >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> - make it configurable on the job it cluster level >>>> which >>>> > > >>>>> metrics >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>>>>>> handler internally creates when that method is >>>> invoked. >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> What do you think? >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> Best, >>>> > > >>>>>>>>>>>>>>> Stephan >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler < >>>> > > >>>>>>>>> ches...@apache.org >>>> > > >>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> As I said before, I believe this to be >>>> over-engineered >>>> > and >>>> > > >>>>> have >>>> > > >>>>>>> no >>>> > > >>>>>>>>>>>>>>>> interest in this implementation. >>>> > > >>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> There are conceptual issues like defining a >>>> duplicate >>>> > > >>>>>>>>>>>>>> numBytesIn(PerSec) >>>> > > >>>>>>>>>>>>>>>> metric that already exists for each operator. >>>> > > >>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote: >>>> > > >>>>>>>>>>>>>>>>> A few updates to the thread. I uploaded a >>>> patch[1] as a >>>> > > >>>>> complete >>>> > > >>>>>>>>>>>>>>>>> example of how users can use the metrics in the >>>> future. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Some thoughts below after taking a look at the >>>> > > >>>>>>> AbstractMetricGroup >>>> > > >>>>>>>>>>>>>> and >>>> > > >>>>>>>>>>>>>>>>> its subclasses. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> This patch intends to provide convenience for >>>> Flink >>>> > > >> connector >>>> > > >>>>>>>>>>>>>>>>> implementations to follow metrics standards >>>> proposed in >>>> > > >>>>> FLIP-33. >>>> > > >>>>>>>>> It >>>> > > >>>>>>>>>>>>>>>>> also try to enhance the metric management in >>>> general >>>> > way >>>> > > to >>>> > > >>>>> help >>>> > > >>>>>>>>>>>>>> users >>>> > > >>>>>>>>>>>>>>>>> with: >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 1. metric definition >>>> > > >>>>>>>>>>>>>>>>> 2. metric dependencies check >>>> > > >>>>>>>>>>>>>>>>> 3. metric validation >>>> > > >>>>>>>>>>>>>>>>> 4. metric control (turn on / off particular >>>> metrics) >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the >>>> > > functionality >>>> > > >> of >>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The >>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the metric >>>> group >>>> > > >>>>>>> hierarchy, >>>> > > >>>>>>>>>>>> but >>>> > > >>>>>>>>>>>>>>>>> does not really manage the metrics other than >>>> keeping >>>> > > them >>>> > > >>>>> in a >>>> > > >>>>>>>>> Map. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Ideally we should only have one entry point for >>>> the >>>> > > >> metrics. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Right now the entry point is >>>> |AbstractMetricGroup|. >>>> > > >> However, >>>> > > >>>>>>>>> besides >>>> > > >>>>>>>>>>>>>>>>> the missing functionality mentioned above, >>>> > > >>>>> |AbstractMetricGroup| >>>> > > >>>>>>>>>>>>>> seems >>>> > > >>>>>>>>>>>>>>>>> deeply rooted in Flink runtime. We could extract >>>> it out >>>> > > to >>>> > > >>>>>>>>>>>>>>>>> flink-metrics in order to use it for generic >>>> purpose. >>>> > > There >>>> > > >>>>> will >>>> > > >>>>>>>>> be >>>> > > >>>>>>>>>>>>>>>>> some work, though. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Another approach is to make |AbstractMetrics| in >>>> this >>>> > > patch >>>> > > >>>>> as >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>>>>>>>> metric entry point. It wraps metric group and >>>> provides >>>> > > the >>>> > > >>>>>>> missing >>>> > > >>>>>>>>>>>>>>>>> functionalities. Then we can roll out this >>>> pattern to >>>> > > >> runtime >>>> > > >>>>>>>>>>>>>>>>> components gradually as well. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> My first thought is that the latter approach >>>> gives a >>>> > more >>>> > > >>>>> smooth >>>> > > >>>>>>>>>>>>>>>>> migration. But I am also OK with doing a >>>> refactoring on >>>> > > the >>>> > > >>>>>>>>>>>>>>>>> |AbstractMetricGroup| family. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1 >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin < >>>> > > >>>>>>> becket....@gmail.com >>>> > > >>>>>>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote: >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Hi Chesnay, >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> It might be easier to discuss some implementation >>>> > details >>>> > > >> in >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>>>>>>>> PR review instead of in the FLIP discussion >>>> thread. I >>>> > > have >>>> > > >> a >>>> > > >>>>>>>>>>>>>> patch >>>> > > >>>>>>>>>>>>>>>>> for Kafka connectors ready but haven't submitted >>>> the PR >>>> > > >> yet. >>>> > > >>>>>>>>>>>>>>>>> Hopefully that will help explain a bit more. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> ** Re: metric type binding >>>> > > >>>>>>>>>>>>>>>>> This is a valid point that worths discussing. If I >>>> > > >> understand >>>> > > >>>>>>>>>>>>>>>>> correctly, there are two points: >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 1. Metric type / interface does not matter as >>>> long as >>>> > the >>>> > > >>>>>>> metric >>>> > > >>>>>>>>>>>>>>>>> semantic is clearly defined. >>>> > > >>>>>>>>>>>>>>>>> Conceptually speaking, I agree that as long as the >>>> > metric >>>> > > >>>>>>>>>>>>>> semantic >>>> > > >>>>>>>>>>>>>>>>> is defined, metric type does not matter. To some >>>> > extent, >>>> > > >>>>> Gauge >>>> > > >>>>>>> / >>>> > > >>>>>>>>>>>>>>>>> Counter / Meter / Histogram themselves can be >>>> think of >>>> > as >>>> > > >>>>> some >>>> > > >>>>>>>>>>>>>>>>> well-recognized semantics, if you wish. In Flink, >>>> these >>>> > > >>>>> metric >>>> > > >>>>>>>>>>>>>>>>> semantics have their associated interface >>>> classes. In >>>> > > >>>>> practice, >>>> > > >>>>>>>>>>>>>>>>> such semantic to interface binding seems >>>> necessary for >>>> > > >>>>>>> different >>>> > > >>>>>>>>>>>>>>>>> components to communicate. Simply standardize the >>>> > > semantic >>>> > > >>>>> of >>>> > > >>>>>>>>>>>>>> the >>>> > > >>>>>>>>>>>>>>>>> connector metrics seems not sufficient for people >>>> to >>>> > > build >>>> > > >>>>>>>>>>>>>>>>> ecosystem on top of. At the end of the day, we >>>> still >>>> > need >>>> > > >> to >>>> > > >>>>>>>>> have >>>> > > >>>>>>>>>>>>>>>>> some embodiment of the metric semantics that >>>> people can >>>> > > >>>>> program >>>> > > >>>>>>>>>>>>>>>>> against. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 2. Sometimes the same metric semantic can be >>>> exposed >>>> > > using >>>> > > >>>>>>>>>>>>>>>>> different metric types / interfaces. >>>> > > >>>>>>>>>>>>>>>>> This is a good point. Counter and >>>> Gauge-as-a-Counter >>>> > are >>>> > > >>>>> pretty >>>> > > >>>>>>>>>>>>>>>>> much interchangeable. This is more of a trade-off >>>> > between >>>> > > >> the >>>> > > >>>>>>>>>>>>>> user >>>> > > >>>>>>>>>>>>>>>>> experience of metric producers and consumers. The >>>> > metric >>>> > > >>>>>>>>>>>>>> producers >>>> > > >>>>>>>>>>>>>>>>> want to use Counter or Gauge depending on whether >>>> the >>>> > > >> counter >>>> > > >>>>>>> is >>>> > > >>>>>>>>>>>>>>>>> already tracked in code, while ideally the metric >>>> > > consumers >>>> > > >>>>>>> only >>>> > > >>>>>>>>>>>>>>>>> want to see a single metric type for each metric. >>>> I am >>>> > > >>>>> leaning >>>> > > >>>>>>>>>>>>>>>>> towards to make the metric producers happy, i.e. >>>> allow >>>> > > >> Gauge >>>> > > >>>>> / >>>> > > >>>>>>>>>>>>>>>>> Counter metric type, and the the metric consumers >>>> > handle >>>> > > >> the >>>> > > >>>>>>>>> type >>>> > > >>>>>>>>>>>>>>>>> variation. The reason is that in practice, there >>>> might >>>> > be >>>> > > >>>>> more >>>> > > >>>>>>>>>>>>>>>>> connector implementations than metric reporter >>>> > > >>>>> implementations. >>>> > > >>>>>>>>>>>>>> We >>>> > > >>>>>>>>>>>>>>>>> could also provide some helper method to >>>> facilitate >>>> > > reading >>>> > > >>>>>>> from >>>> > > >>>>>>>>>>>>>>>>> such variable metric type. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Just some quick replies to the comments around >>>> > > >> implementation >>>> > > >>>>>>>>>>>>>>>> details. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>> except >>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Register connector specific ones in a single >>>> place is >>>> > > >>>>> actually >>>> > > >>>>>>>>>>>>>>>>> something that I want to achieve. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases like >>>> > > >>>>> Prometheus. >>>> > > >>>>>>>>>>>>>> We >>>> > > >>>>>>>>>>>>>>>>> would >>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>> > > >>>>>>>>> fetchTime/emitTime >>>> > > >>>>>>>>>>>>>>>>> that is >>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), >>>> > where a >>>> > > >>>>>>> user >>>> > > >>>>>>>>>>>>>>>>> could build >>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Not sure if such downsampling works. As an >>>> example, if >>>> > a >>>> > > >> user >>>> > > >>>>>>>>>>>>>>>>> complains that there are some intermittent latency >>>> > spikes >>>> > > >>>>>>> (maybe >>>> > > >>>>>>>>>>>>>> a >>>> > > >>>>>>>>>>>>>>>>> few records in 10 seconds) in their processing >>>> system. >>>> > > >>>>> Having a >>>> > > >>>>>>>>>>>>>>>>> Gauge sampling instantaneous latency seems >>>> unlikely >>>> > > useful. >>>> > > >>>>>>>>>>>>>>>>> However by looking at actual 99.9 percentile >>>> latency >>>> > > might >>>> > > >>>>>>> help. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler >>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> >>>> > wrote: >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Re: over complication of implementation. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> I think I get understand better know what you're >>>> > > >> shooting >>>> > > >>>>>>>>>>>>>> for, >>>> > > >>>>>>>>>>>>>>>>> effectively something like the >>>> OperatorIOMetricGroup. >>>> > > >>>>>>>>>>>>>>>>> But still, re-define setupConnectorMetrics() to >>>> > accept >>>> > > a >>>> > > >>>>>>> set >>>> > > >>>>>>>>>>>>>>>>> of flags >>>> > > >>>>>>>>>>>>>>>>> for counters/meters(ans _possibly_ histograms) >>>> along >>>> > > >>>>> with a >>>> > > >>>>>>>>>>>>>>>>> set of >>>> > > >>>>>>>>>>>>>>>>> well-defined Optional<Gauge<?>>, and return the >>>> > group. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Solves all issues as far as i can tell: >>>> > > >>>>>>>>>>>>>>>>> 1) no metrics must be created manually (except >>>> > Gauges, >>>> > > >>>>>>> which >>>> > > >>>>>>>>>>>>>>> are >>>> > > >>>>>>>>>>>>>>>>> effectively just Suppliers and you can't get >>>> around >>>> > > >>>>> this), >>>> > > >>>>>>>>>>>>>>>>> 2) additional metrics can be registered on the >>>> > returned >>>> > > >>>>>>>>>>>>>> group, >>>> > > >>>>>>>>>>>>>>>>> 3) see 1), >>>> > > >>>>>>>>>>>>>>>>> 4) single place where metrics are registered >>>> except >>>> > > >>>>>>>>>>>>>>>>> connector-specific >>>> > > >>>>>>>>>>>>>>>>> ones (which we can't really avoid). >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> Re: Histogram >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 1) As an example, whether "numRecordsIn" is >>>> exposed >>>> > as >>>> > > a >>>> > > >>>>>>>>>>>>>>>>> Counter or a >>>> > > >>>>>>>>>>>>>>>>> Gauge should be irrelevant. So far we're using >>>> the >>>> > > >> metric >>>> > > >>>>>>>>>>>>>> type >>>> > > >>>>>>>>>>>>>>>>> that is >>>> > > >>>>>>>>>>>>>>>>> the most convenient at exposing a given value. >>>> If >>>> > there >>>> > > >>>>> is >>>> > > >>>>>>>>>>>>>>>>> some backing >>>> > > >>>>>>>>>>>>>>>>> data-structure that we want to expose some data >>>> from >>>> > we >>>> > > >>>>>>>>>>>>>>>>> typically opt >>>> > > >>>>>>>>>>>>>>>>> for a Gauge, as otherwise we're just mucking >>>> around >>>> > > with >>>> > > >>>>>>> the >>>> > > >>>>>>>>>>>>>>>>> Meter/Counter API to get it to match. >>>> Similarly, if >>>> > we >>>> > > >>>>> want >>>> > > >>>>>>>>>>>>>> to >>>> > > >>>>>>>>>>>>>>>>> count >>>> > > >>>>>>>>>>>>>>>>> something but no current count exists we >>>> typically >>>> > > added >>>> > > >>>>> a >>>> > > >>>>>>>>>>>>>>>>> Counter. >>>> > > >>>>>>>>>>>>>>>>> That's why attaching semantics to metric types >>>> makes >>>> > > >>>>> little >>>> > > >>>>>>>>>>>>>>>>> sense (but >>>> > > >>>>>>>>>>>>>>>>> unfortunately several reporters already do it); >>>> for >>>> > > >>>>>>>>>>>>>>>>> counters/meters >>>> > > >>>>>>>>>>>>>>>>> certainly, but the majority of metrics are >>>> gauges. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> 2) I'm talking about time-series databases like >>>> > > >>>>> Prometheus. >>>> > > >>>>>>>>>>>>>> We >>>> > > >>>>>>>>>>>>>>>>> would >>>> > > >>>>>>>>>>>>>>>>> only have a gauge metric exposing the last >>>> > > >>>>>>>>> fetchTime/emitTime >>>> > > >>>>>>>>>>>>>>>>> that is >>>> > > >>>>>>>>>>>>>>>>> regularly reported to the backend (Prometheus), >>>> > where a >>>> > > >>>>>>> user >>>> > > >>>>>>>>>>>>>>>>> could build >>>> > > >>>>>>>>>>>>>>>>> a histogram of his choosing when/if he wants it. >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> On 22.02.2019 13:57, Becket Qin wrote: >>>> > > >>>>>>>>>>>>>>>>>> Hi Chesnay, >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Thanks for the explanation. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> ** Re: FLIP >>>> > > >>>>>>>>>>>>>>>>>> I might have misunderstood this, but it seems >>>> that >>>> > > "major >>>> > > >>>>>>>>>>>>>>>>> changes" are well >>>> > > >>>>>>>>>>>>>>>>>> defined in FLIP. The full contents is following: >>>> > > >>>>>>>>>>>>>>>>>> What is considered a "major change" that needs a >>>> FLIP? >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Any of the following should be considered a major >>>> > > change: >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of >>>> > > >>>>>>>>>>>>>>>>> functionality >>>> > > >>>>>>>>>>>>>>>>>> - *Any change that impacts the public interfaces >>>> of >>>> > the >>>> > > >>>>>>>>>>>>>>>>> project* >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> What are the "public interfaces" of the project? >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> *All of the following are public interfaces *that >>>> > people >>>> > > >>>>>>>>>>>>>>>>> build around: >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - DataStream and DataSet API, including classes >>>> > related >>>> > > >>>>>>>>>>>>>>>>> to that, such as >>>> > > >>>>>>>>>>>>>>>>>> StreamExecutionEnvironment >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - Classes marked with the @Public annotation >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - On-disk binary formats, such as >>>> > > >>>>>>>>>>>>>> checkpoints/savepoints >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e. >>>> > > >>>>>>>>>>>>>>>>> bin/flink, Yarn scripts, >>>> > > >>>>>>>>>>>>>>>>>> Mesos scripts >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - Configuration settings >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> - *Exposed monitoring information* >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> So any monitoring information change is >>>> considered as >>>> > > >>>>>>>>>>>>>> public >>>> > > >>>>>>>>>>>>>>>>> interface, and >>>> > > >>>>>>>>>>>>>>>>>> any public interface change is considered as a >>>> "major >>>> > > >>>>>>>>>>>>>>> change". >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> ** Re: over complication of implementation. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Although this is more of implementation details >>>> that >>>> > is >>>> > > >> not >>>> > > >>>>>>>>>>>>>>>>> covered by the >>>> > > >>>>>>>>>>>>>>>>>> FLIP. But it may be worth discussing. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> First of all, I completely agree that we should >>>> use >>>> > the >>>> > > >>>>>>>>>>>>>>>>> simplest way to >>>> > > >>>>>>>>>>>>>>>>>> achieve our goal. To me the goal is the >>>> following: >>>> > > >>>>>>>>>>>>>>>>>> 1. Clear connector conventions and interfaces. >>>> > > >>>>>>>>>>>>>>>>>> 2. The easiness of creating a connector. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Both of them are important to the prosperity of >>>> the >>>> > > >>>>>>>>>>>>>>>>> connector ecosystem. So >>>> > > >>>>>>>>>>>>>>>>>> I'd rather abstract as much as possible on our >>>> side to >>>> > > >> make >>>> > > >>>>>>>>>>>>>>>>> the connector >>>> > > >>>>>>>>>>>>>>>>>> developer's work lighter. Given this goal, a >>>> static >>>> > util >>>> > > >>>>>>>>>>>>>>>>> method approach >>>> > > >>>>>>>>>>>>>>>>>> might have a few drawbacks: >>>> > > >>>>>>>>>>>>>>>>>> 1. Users still have to construct the metrics by >>>> > > >> themselves. >>>> > > >>>>>>>>>>>>>>>>> (And note that >>>> > > >>>>>>>>>>>>>>>>>> this might be erroneous by itself. For example, a >>>> > > customer >>>> > > >>>>>>>>>>>>>>>>> wrapper around >>>> > > >>>>>>>>>>>>>>>>>> dropwizard meter maybe used instead of >>>> MeterView). >>>> > > >>>>>>>>>>>>>>>>>> 2. When connector specific metrics are added, it >>>> is >>>> > > >>>>>>>>>>>>>>>>> difficult to enforce >>>> > > >>>>>>>>>>>>>>>>>> the scope to be the same as standard metrics. >>>> > > >>>>>>>>>>>>>>>>>> 3. It seems that a method proliferation is >>>> inevitable >>>> > if >>>> > > >> we >>>> > > >>>>>>>>>>>>>>>>> want to apply >>>> > > >>>>>>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn was >>>> not >>>> > > >>>>>>>>>>>>>>>>> registered for a meter. >>>> > > >>>>>>>>>>>>>>>>>> 4. Metrics are still defined in random places >>>> and hard >>>> > > to >>>> > > >>>>>>>>>>>>>>>> track. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> The current PR I had was inspired by the Config >>>> system >>>> > > in >>>> > > >>>>>>>>>>>>>>>>> Kafka, which I >>>> > > >>>>>>>>>>>>>>>>>> found pretty handy. In fact it is not only used >>>> by >>>> > Kafka >>>> > > >>>>>>>>>>>>>>>>> itself but even >>>> > > >>>>>>>>>>>>>>>>>> some other projects that depend on Kafka. I am >>>> not >>>> > > saying >>>> > > >>>>>>>>>>>>>>>>> this approach is >>>> > > >>>>>>>>>>>>>>>>>> perfect. But I think it worths to save the work >>>> for >>>> > > >>>>>>>>>>>>>>>>> connector writers and >>>> > > >>>>>>>>>>>>>>>>>> encourage more systematic implementation. That >>>> being >>>> > > said, >>>> > > >>>>>>>>>>>>>> I >>>> > > >>>>>>>>>>>>>>>>> am fully open >>>> > > >>>>>>>>>>>>>>>>>> to suggestions. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Re: Histogram >>>> > > >>>>>>>>>>>>>>>>>> I think there are two orthogonal questions around >>>> > those >>>> > > >>>>>>>>>>>>>>>> metrics: >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> 1. Regardless of the metric type, by just >>>> looking at >>>> > the >>>> > > >>>>>>>>>>>>>>>>> meaning of a >>>> > > >>>>>>>>>>>>>>>>>> metric, is generic to all connectors? If the >>>> answer is >>>> > > >> yes, >>>> > > >>>>>>>>>>>>>>>>> we should >>>> > > >>>>>>>>>>>>>>>>>> include the metric into the convention. No matter >>>> > > whether >>>> > > >>>>>>>>>>>>>> we >>>> > > >>>>>>>>>>>>>>>>> include it >>>> > > >>>>>>>>>>>>>>>>>> into the convention or not, some connector >>>> > > implementations >>>> > > >>>>>>>>>>>>>>>>> will emit such >>>> > > >>>>>>>>>>>>>>>>>> metric. It is better to have a convention than >>>> letting >>>> > > >> each >>>> > > >>>>>>>>>>>>>>>>> connector do >>>> > > >>>>>>>>>>>>>>>>>> random things. >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> 2. If a standard metric is a histogram, what >>>> should we >>>> > > do? >>>> > > >>>>>>>>>>>>>>>>>> I agree that we should make it clear that using >>>> > > histograms >>>> > > >>>>>>>>>>>>>>>>> will have >>>> > > >>>>>>>>>>>>>>>>>> performance risk. But I do see histogram is >>>> useful in >>>> > > some >>>> > > >>>>>>>>>>>>>>>>> fine-granularity >>>> > > >>>>>>>>>>>>>>>>>> debugging where one do not have the luxury to >>>> stop the >>>> > > >>>>>>>>>>>>>>>>> system and inject >>>> > > >>>>>>>>>>>>>>>>>> more inspection code. So the workaround I am >>>> thinking >>>> > is >>>> > > >> to >>>> > > >>>>>>>>>>>>>>>>> provide some >>>> > > >>>>>>>>>>>>>>>>>> implementation suggestions. Assume later on we >>>> have a >>>> > > >>>>>>>>>>>>>>>>> mechanism of >>>> > > >>>>>>>>>>>>>>>>>> selective metrics. In the abstract metrics class >>>> we >>>> > can >>>> > > >>>>>>>>>>>>>>>>> disable those >>>> > > >>>>>>>>>>>>>>>>>> metrics by default individual connector writers >>>> does >>>> > not >>>> > > >>>>>>>>>>>>>>>>> have to do >>>> > > >>>>>>>>>>>>>>>>>> anything (this is another advantage of having an >>>> > > >>>>>>>>>>>>>>>>> AbstractMetrics instead of >>>> > > >>>>>>>>>>>>>>>>>> static util methods.) >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> I am not sure I fully understand the histogram >>>> in the >>>> > > >>>>>>>>>>>>>>>>> backend approach. Can >>>> > > >>>>>>>>>>>>>>>>>> you explain a bit more? Do you mean emitting the >>>> raw >>>> > > data, >>>> > > >>>>>>>>>>>>>>>>> e.g. fetchTime >>>> > > >>>>>>>>>>>>>>>>>> and emitTime with each record and let the >>>> histogram >>>> > > >>>>>>>>>>>>>>>>> computation happen in >>>> > > >>>>>>>>>>>>>>>>>> the background? Or let the processing thread >>>> putting >>>> > the >>>> > > >>>>>>>>>>>>>>>>> values into a >>>> > > >>>>>>>>>>>>>>>>>> queue and have a separate thread polling from the >>>> > queue >>>> > > >> and >>>> > > >>>>>>>>>>>>>>>>> add them into >>>> > > >>>>>>>>>>>>>>>>>> the histogram? >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay Schepler >>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org >>>> >> >>>> > > wrote: >>>> > > >>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> Re: Flip >>>> > > >>>>>>>>>>>>>>>>>>> The very first line under both the main header >>>> and >>>> > > >> Purpose >>>> > > >>>>>>>>>>>>>>>>> section >>>> > > >>>>>>>>>>>>>>>>>>> describe Flips as "major changes", which this >>>> isn't. >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> Re: complication >>>> > > >>>>>>>>>>>>>>>>>>> I'm not arguing against standardization, but >>>> again an >>>> > > >>>>>>>>>>>>>>>>> over-complicated >>>> > > >>>>>>>>>>>>>>>>>>> implementation when a static utility method >>>> would be >>>> > > >>>>>>>>>>>>>>>>> sufficient. >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> public static void setupConnectorMetrics( >>>> > > >>>>>>>>>>>>>>>>>>> MetricGroup operatorMetricGroup, >>>> > > >>>>>>>>>>>>>>>>>>> String connectorName, >>>> > > >>>>>>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn, >>>> > > >>>>>>>>>>>>>>>>>>> ...) >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> This gives you all you need: >>>> > > >>>>>>>>>>>>>>>>>>> * a well-defined set of metrics for a connector >>>> to >>>> > > opt-in >>>> > > >>>>>>>>>>>>>>>>>>> * standardized naming schemes for scope and >>>> > individual >>>> > > >>>>>>>>>>>>>>> metrics >>>> > > >>>>>>>>>>>>>>>>>>> * standardize metric types (although personally >>>> I'm >>>> > not >>>> > > >>>>>>>>>>>>>>>>> interested in that >>>> > > >>>>>>>>>>>>>>>>>>> since metric types should be considered >>>> syntactic >>>> > > sugar) >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> Re: Configurable Histogram >>>> > > >>>>>>>>>>>>>>>>>>> If anything they _must_ be turned off by >>>> default, but >>>> > > the >>>> > > >>>>>>>>>>>>>>>>> metric system is >>>> > > >>>>>>>>>>>>>>>>>>> already exposing so many options that I'm not >>>> too >>>> > keen >>>> > > on >>>> > > >>>>>>>>>>>>>>>>> adding even more. >>>> > > >>>>>>>>>>>>>>>>>>> You have also only addressed my first argument >>>> > against >>>> > > >>>>>>>>>>>>>>>>> histograms >>>> > > >>>>>>>>>>>>>>>>>>> (performance), the second one still stands >>>> (calculate >>>> > > >>>>>>>>>>>>>>>>> histogram in metric >>>> > > >>>>>>>>>>>>>>>>>>> backends instead). >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote: >>>> > > >>>>>>>>>>>>>>>>>>>> Hi Chesnay, >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the comments. I think this is >>>> worthy of a >>>> > > >> FLIP >>>> > > >>>>>>>>>>>>>>>>> because it is >>>> > > >>>>>>>>>>>>>>>>>>>> public API. According to the FLIP description >>>> a FlIP >>>> > > is >>>> > > >>>>>>>>>>>>>>>>> required in case >>>> > > >>>>>>>>>>>>>>>>>>> of: >>>> > > >>>>>>>>>>>>>>>>>>>> - Any change that impacts the public >>>> interfaces of >>>> > > >>>>>>>>>>>>>>>>> the project >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> and the following entry is found in the >>>> definition >>>> > of >>>> > > >>>>>>>>>>>>>>>>> "public interface". >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> - Exposed monitoring information >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> Metrics are critical to any production system. >>>> So a >>>> > > >> clear >>>> > > >>>>>>>>>>>>>>>>> metric >>>> > > >>>>>>>>>>>>>>>>>>> definition >>>> > > >>>>>>>>>>>>>>>>>>>> is important for any serious users. For an >>>> > > organization >>>> > > >>>>>>>>>>>>>>>>> with large Flink >>>> > > >>>>>>>>>>>>>>>>>>>> installation, change in metrics means great >>>> amount >>>> > of >>>> > > >>>>>>>>>>>>>>>>> work. So such >>>> > > >>>>>>>>>>>>>>>>>>> changes >>>> > > >>>>>>>>>>>>>>>>>>>> do need to be fully discussed and documented. >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: Histogram. >>>> > > >>>>>>>>>>>>>>>>>>>> We can discuss whether there is a better way to >>>> > expose >>>> > > >>>>>>>>>>>>>>>>> metrics that are >>>> > > >>>>>>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark on >>>> > various >>>> > > >>>>>>>>>>>>>>>>> histogram >>>> > > >>>>>>>>>>>>>>>>>>>> implementations also indicates that they are >>>> > > >>>>>>>>>>>>>> significantly >>>> > > >>>>>>>>>>>>>>>>> slower than >>>> > > >>>>>>>>>>>>>>>>>>>> other metric types. But I don't think that >>>> means >>>> > never >>>> > > >>>>>>>>>>>>>> use >>>> > > >>>>>>>>>>>>>>>>> histogram, but >>>> > > >>>>>>>>>>>>>>>>>>>> means use it with caution. For example, we can >>>> > suggest >>>> > > >>>>>>>>>>>>>> the >>>> > > >>>>>>>>>>>>>>>>>>> implementations >>>> > > >>>>>>>>>>>>>>>>>>>> to turn them off by default and only turn it >>>> on for >>>> > a >>>> > > >>>>>>>>>>>>>>>>> small amount of >>>> > > >>>>>>>>>>>>>>>>>>> time >>>> > > >>>>>>>>>>>>>>>>>>>> when performing some micro-debugging. >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: complication: >>>> > > >>>>>>>>>>>>>>>>>>>> Connector conventions are essential for Flink >>>> > > ecosystem. >>>> > > >>>>>>>>>>>>>>>>> Flink connectors >>>> > > >>>>>>>>>>>>>>>>>>>> pool is probably the most important part of >>>> Flink, >>>> > > just >>>> > > >>>>>>>>>>>>>>>>> like any other >>>> > > >>>>>>>>>>>>>>>>>>> data >>>> > > >>>>>>>>>>>>>>>>>>>> system. Clear conventions of connectors will >>>> help >>>> > > build >>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem >>>> > > >>>>>>>>>>>>>>>>>>> in >>>> > > >>>>>>>>>>>>>>>>>>>> a more organic way. >>>> > > >>>>>>>>>>>>>>>>>>>> Take the metrics convention as an example, >>>> imagine >>>> > > >>>>>>>>>>>>>> someone >>>> > > >>>>>>>>>>>>>>>>> has developed >>>> > > >>>>>>>>>>>>>>>>>>> a >>>> > > >>>>>>>>>>>>>>>>>>>> Flink connector for System foo, and another >>>> > developer >>>> > > >> may >>>> > > >>>>>>>>>>>>>>>>> have developed >>>> > > >>>>>>>>>>>>>>>>>>> a >>>> > > >>>>>>>>>>>>>>>>>>>> monitoring and diagnostic framework for Flink >>>> which >>>> > > >>>>>>>>>>>>>>>>> analyzes the Flink >>>> > > >>>>>>>>>>>>>>>>>>> job >>>> > > >>>>>>>>>>>>>>>>>>>> performance based on metrics. With a clear >>>> metric >>>> > > >>>>>>>>>>>>>>>>> convention, those two >>>> > > >>>>>>>>>>>>>>>>>>>> projects could be developed independently. Once >>>> > users >>>> > > >> put >>>> > > >>>>>>>>>>>>>>>>> them together, >>>> > > >>>>>>>>>>>>>>>>>>>> it would work without additional >>>> modifications. This >>>> > > >>>>>>>>>>>>>>>>> cannot be easily >>>> > > >>>>>>>>>>>>>>>>>>>> achieved by just defining a few constants. >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: selective metrics: >>>> > > >>>>>>>>>>>>>>>>>>>> Sure, we can discuss that in a separate thread. >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> @Dawid >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency >>>> > > >>>>>>>>>>>>>>>>>>>> The primary purpose of establish such a >>>> convention >>>> > is >>>> > > to >>>> > > >>>>>>>>>>>>>>>>> help developers >>>> > > >>>>>>>>>>>>>>>>>>>> write connectors in a more compatible way. The >>>> > > >> convention >>>> > > >>>>>>>>>>>>>>>>> is supposed to >>>> > > >>>>>>>>>>>>>>>>>>> be >>>> > > >>>>>>>>>>>>>>>>>>>> defined more proactively. So when look at the >>>> > > >> convention, >>>> > > >>>>>>>>>>>>>>>>> it seems more >>>> > > >>>>>>>>>>>>>>>>>>>> important to see if the concept is applicable >>>> to >>>> > > >>>>>>>>>>>>>>>>> connectors in general. >>>> > > >>>>>>>>>>>>>>>>>>> It >>>> > > >>>>>>>>>>>>>>>>>>>> might be true so far only Kafka connector >>>> reports >>>> > > >>>>>>>>>>>>>> latency. >>>> > > >>>>>>>>>>>>>>>>> But there >>>> > > >>>>>>>>>>>>>>>>>>> might >>>> > > >>>>>>>>>>>>>>>>>>>> be hundreds of other connector implementations >>>> in >>>> > the >>>> > > >>>>>>>>>>>>>>>>> Flink ecosystem, >>>> > > >>>>>>>>>>>>>>>>>>>> though not in the Flink repo, and some of them >>>> also >>>> > > >> emits >>>> > > >>>>>>>>>>>>>>>>> latency. I >>>> > > >>>>>>>>>>>>>>>>>>> think >>>> > > >>>>>>>>>>>>>>>>>>>> a lot of other sources actually also has an >>>> append >>>> > > >>>>>>>>>>>>>>>>> timestamp. e.g. >>>> > > >>>>>>>>>>>>>>>>>>> database >>>> > > >>>>>>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't be >>>> > > surprised >>>> > > >>>>>>>>>>>>>>>>> if some database >>>> > > >>>>>>>>>>>>>>>>>>>> connector can also emit latency metrics. >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay >>>> Schepler >>>> > > >>>>>>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org >>>> >> >>>> > > >>>>>>>>>>>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to >>>> investigate >>>> > > this >>>> > > >>>>>>>>>>>>>> as >>>> > > >>>>>>>>>>>>>>>>> part of this >>>> > > >>>>>>>>>>>>>>>>>>>>> FLIP. This is something that could be of >>>> interest >>>> > for >>>> > > >>>>>>>>>>>>>> the >>>> > > >>>>>>>>>>>>>>>>> entire metric >>>> > > >>>>>>>>>>>>>>>>>>>>> system, and should be designed for as such. >>>> > > >>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> Regarding the proposal as a whole: >>>> > > >>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the >>>> core of >>>> > > >>>>>>>>>>>>>>>>> Flink. They are >>>> > > >>>>>>>>>>>>>>>>>>>>> significantly more expensive than other >>>> metrics, >>>> > and >>>> > > >>>>>>>>>>>>>>>>> calculating >>>> > > >>>>>>>>>>>>>>>>>>>>> histograms in the application is regarded as >>>> an >>>> > > >>>>>>>>>>>>>>>>> anti-pattern by several >>>> > > >>>>>>>>>>>>>>>>>>>>> metric backends, who instead recommend to >>>> expose >>>> > the >>>> > > >> raw >>>> > > >>>>>>>>>>>>>>>>> data and >>>> > > >>>>>>>>>>>>>>>>>>>>> calculate the histogram in the backend. >>>> > > >>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> Second, this seems overly complicated. Given >>>> that >>>> > we >>>> > > >>>>>>>>>>>>>>>>> already established >>>> > > >>>>>>>>>>>>>>>>>>>>> that not all connectors will export all >>>> metrics we >>>> > > are >>>> > > >>>>>>>>>>>>>>>>> effectively >>>> > > >>>>>>>>>>>>>>>>>>>>> reducing this down to a consistent naming >>>> scheme. >>>> > We >>>> > > >>>>>>>>>>>>>>>>> don't need anything >>>> > > >>>>>>>>>>>>>>>>>>>>> sophisticated for that; basically just a few >>>> > > constants >>>> > > >>>>>>>>>>>>>>>>> that all >>>> > > >>>>>>>>>>>>>>>>>>>>> connectors use. >>>> > > >>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a >>>> FLIP. >>>> > > >>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote: >>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi, >>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. But >>>> > those >>>> > > >>>>>>>>>>>>>>>>> particular metrics >>>> > > >>>>>>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would >>>> only be >>>> > > >>>>>>>>>>>>>>>>> reported if user uses >>>> > > >>>>>>>>>>>>>>>>>>>>>> KafkaConsumer with internal >>>> timestampAssigner with >>>> > > >>>>>>>>>>>>>>>>> StreamCharacteristic >>>> > > >>>>>>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like a >>>> very >>>> > > >>>>>>>>>>>>>>>>> specific case. I am >>>> > > >>>>>>>>>>>>>>>>>>> not >>>> > > >>>>>>>>>>>>>>>>>>>>>> sure if we should introduce a generic metric >>>> that >>>> > > will >>>> > > >>>>>>>>>>>>>> be >>>> > > >>>>>>>>>>>>>>>>>>>>>> disabled/absent for most of implementations. >>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, >>>> that >>>> > > might >>>> > > >>>>>>>>>>>>>>>>> make sense to >>>> > > >>>>>>>>>>>>>>>>>>>>>> investigate in the future. >>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>> Best, >>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>> Dawid >>>> > > >>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote: >>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense >>>> to me. >>>> > > >> There >>>> > > >>>>>>>>>>>>>>>>> are two cases >>>> > > >>>>>>>>>>>>>>>>>>> to >>>> > > >>>>>>>>>>>>>>>>>>>>> be >>>> > > >>>>>>>>>>>>>>>>>>>>>>> addressed. >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a >>>> guidance. It >>>> > is >>>> > > >>>>>>>>>>>>>>>>> likely that a >>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>> > > >>>>>>>>>>>>>>>>>>>>>>> only supports some but not all of the >>>> metrics. In >>>> > > >> that >>>> > > >>>>>>>>>>>>>>>>> case, each >>>> > > >>>>>>>>>>>>>>>>>>>>> connector >>>> > > >>>>>>>>>>>>>>>>>>>>>>> implementation should have the freedom to >>>> decide >>>> > > >> which >>>> > > >>>>>>>>>>>>>>>>> metrics are >>>> > > >>>>>>>>>>>>>>>>>>>>>>> reported. For the metrics that are >>>> supported, the >>>> > > >>>>>>>>>>>>>>>>> guidance should be >>>> > > >>>>>>>>>>>>>>>>>>>>>>> followed. >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable >>>> certain >>>> > > >> metrics >>>> > > >>>>>>>>>>>>>>>>> for some reason >>>> > > >>>>>>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of data). A >>>> > > generic >>>> > > >>>>>>>>>>>>>>>>> mechanism should >>>> > > >>>>>>>>>>>>>>>>>>> be >>>> > > >>>>>>>>>>>>>>>>>>>>>>> provided to allow user choose which metrics >>>> are >>>> > > >>>>>>>>>>>>>>>>> reported. This >>>> > > >>>>>>>>>>>>>>>>>>> mechanism >>>> > > >>>>>>>>>>>>>>>>>>>>>>> should also be honored by the connector >>>> > > >>>>>>>>>>>>>> implementations. >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> Does this sound reasonable to you? >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid >>>> Wysakowicz >>>> > < >>>> > > >>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto: >>>> > > dwysakow...@apache.org >>>> > > >>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a >>>> unified, >>>> > > >>>>>>>>>>>>>>>>> standard set of >>>> > > >>>>>>>>>>>>>>>>>>> metrics >>>> > > >>>>>>>>>>>>>>>>>>>>> for >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> all connectors. I have some slight concerns >>>> > about >>>> > > >>>>>>>>>>>>>>>>> fetchLatency and >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> latency though. They are computed based on >>>> > > EventTime >>>> > > >>>>>>>>>>>>>>>>> which is not a >>>> > > >>>>>>>>>>>>>>>>>>>>> purely >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> technical feature. It depends often on some >>>> > > business >>>> > > >>>>>>>>>>>>>>>>> logic, might be >>>> > > >>>>>>>>>>>>>>>>>>>>> absent >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics >>>> could >>>> > also >>>> > > >>>>>>>>>>>>>>>>> behave in a weird >>>> > > >>>>>>>>>>>>>>>>>>>>> way in >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I am >>>> not >>>> > sure >>>> > > >> if >>>> > > >>>>>>>>>>>>>>>>> we should >>>> > > >>>>>>>>>>>>>>>>>>> include >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could at >>>> > least >>>> > > >>>>>>>>>>>>>>>>> introduce a feature >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> switch for them? What do you think? >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote: >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the >>>> proposed >>>> > > >>>>>>>>>>>>>>>>> metrics. I'll start a >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> voting thread later toady. >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket Qin >>>> > > >>>>>>>>>>>>>>>>> <becket....@gmail.com <mailto: >>>> becket....@gmail.com>> >>>> > < >>>> > > >>>>>>>>>>>>>>>>>>>>> becket....@gmail.com <mailto: >>>> becket....@gmail.com >>>> > >> >>>> > > >>>>>>>>>>>>>>> wrote: >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi folks, >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> I would like to start the FLIP discussion >>>> thread >>>> > > >>>>>>>>>>>>>> about >>>> > > >>>>>>>>>>>>>>>>> standardize >>>> > > >>>>>>>>>>>>>>>>>>> the >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> connector metrics. >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> In short, we would like to provide a >>>> convention >>>> > of >>>> > > >>>>>>>>>>>>>>>>> Flink connector >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the >>>> monitoring >>>> > and >>>> > > >>>>>>>>>>>>>>>>> alerting on Flink >>>> > > >>>>>>>>>>>>>>>>>>>>> jobs. >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> The FLIP link is following: >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>> >>>> > > >>>>> >>>> > > >> >>>> > > >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>>>> >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>>>> >>>> > > >>>>>>> >>>> > > >>>>>>> >>>> > > >>>>>>> >>>> > > >>>>> >>>> > > >>>>> >>>> > > >> >>>> > > >> >>>> > > >>>> > > >>>> > >>>> >>>> >>>> -- >>>> >>>> Konstantin Knauf >>>> >>>> https://twitter.com/snntrable >>>> >>>> https://github.com/knaufk >>>> >>>