Hi Piotr,

Thanks for the suggestion. Some thoughts below:

Re 1: The pending messages / bytes.
I completely agree these are very useful metrics and we should expect the
connector to report. WRT the way to expose them, it seems more consistent
to add two metrics instead of adding a method (unless there are other use
cases other than metric reporting). So we can add the following two metrics.
   - pending.bytes, Gauge
   - pending.messages, Gauge
Applicable connectors can choose to report them. These two metrics along
with latency should be sufficient for users to understand the progress of a
connector.


Re 2: Number of buffered data in-memory of the connector
If I understand correctly, this metric along with the pending mesages /
bytes would answer the questions of:
  - Does the connector consume fast enough? Lagging behind + empty buffer =
cannot consume fast enough.
  - Does the connector emit fast enough? Lagging behind + full buffer =
cannot emit fast enough, i.e. the Flink pipeline is slow.

One feature we are currently working on to scale Flink automatically relies
on some metrics answering the same question, more specifically, we are
profiling the time spent on .next() method (time to consume) and the time
spent on .collect() method (time to emit / process). One advantage of such
method level time cost allows us to calculate the parallelism required to
keep up in case their is a lag.

However, one concern I have regarding such metric is that they are
implementation specific. Either profiling on the method time, or reporting
buffer usage assumes the connector are implemented in such a way. A
slightly better solution might be have the following metric:

     - EmitTimeRatio (or FetchTimeRatio): The time spent on emitting
records / Total time elapsed.

This assumes that the source connectors have to emit the records to the
downstream at some point. The emission may take some time ( e.g. go through
chained operators). And the rest of the time are spent to prepare the
record to emit, including time for consuming and format conversion, etc.
Ideally, we'd like to see the time spent on record fetch and emit to be
about the same, so no one is bottleneck for the other.

The downside of these time based metrics is additional overhead to get the
time, therefore sampling might be needed. But in practice I feel such time
based metric might be more useful if we want to take action.


I think we should absolutely add metrics in (1) to the metric convention.
We could also add the metrics mentioned in (2) if we reach consensus on
that. What do you think?

Thanks,

Jiangjie (Becket) Qin


On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hey Becket,
>
> Re 1a) and 1b) +1 from my side.
>
> I’ve discussed this issue:
>
> >>>
> >>> 2. It would be nice to have metrics, that allow us to check the cause
> of
> >>> back pressure:
> >>> a) for sources, length of input queue (in bytes? Or boolean
> >>> hasSomethingl/isEmpty)
> >>> b) for sinks, length of output queue (in bytes? Or boolean
> >>> hasSomething/isEmpty)
>
> With Nico at some lengths and he also saw the benefits of them. We also
> have more concrete proposal for that.
>
> Actually there are two really useful metrics, that we are missing
> currently:
>
> 1. Number of data/records/bytes in the backlog to process. For example
> remaining number of bytes in unread files. Or pending data in Kafka topics.
> 2. Number of buffered data in-memory of the connector, that are waiting to
> be processed pushed to Flink pipeline.
>
> Re 1:
> This would have to be a metric provided directly by a connector. It could
> be an undefined `int`:
>
> `int backlog` - estimate of pending work.
>
> “Undefined” meaning that it would be up to a connector to decided whether
> it’s measured in bytes, records, pending files or whatever it is possible
> to provide by the connector. This is because I assume not every connector
> can provide exact number and for some of them it might be impossible to
> provide records number of bytes count.
>
> Re 2:
> This metric could be either provided by a connector, or calculated crudely
> by Flink:
>
> `float bufferUsage` - value from [0.0, 1.0] range
>
> Percentage of used in memory buffers, like in Kafka’s handover.
>
> It could be crudely implemented by Flink with FLIP-27
> SourceReader#isAvailable. If SourceReader is not available reported
> `bufferUsage` could be 0.0. If it is available, it could be 1.0. I think
> this would be a good enough estimation for most of the use cases (that
> could be overloaded and implemented better if desired). Especially since we
> are reporting only probed values: if probed values are almost always “1.0”,
> it would mean that we have a back pressure. If they are almost always
> “0.0”, there is probably no back pressure at the sources.
>
> What do you think about this?
>
> Piotrek
>
> > On 30 May 2019, at 11:41, Becket Qin <becket....@gmail.com> wrote:
> >
> > Hi all,
> >
> > Thanks a lot for all the feedback and comments. I'd like to continue the
> > discussion on this FLIP.
> >
> > I updated the FLIP-33 wiki to remove all the histogram metrics from the
> > first version of metric convention due to the performance concern. The
> plan
> > is to introduce them later when we have a mechanism to opt in/out
> metrics.
> > At that point, users can decide whether they want to pay the cost to get
> > the metric or not.
> >
> > As Stephan suggested, for this FLIP, let's first try to agree on the
> small
> > list of conventional metrics that connectors should follow.
> > Just to be clear, the purpose of the convention is not to enforce every
> > connector to report all these metrics, but to provide a guidance in case
> > these metrics are reported by some connectors.
> >
> >
> > @ Stephan & Chesnay,
> >
> > Regarding the duplication of `RecordsIn` metric in operator / task
> > IOMetricGroups, from what I understand, for source operator, it is
> actually
> > the SourceFunction that reports the operator level
> > RecordsIn/RecordsInPerSecond metric. So they are essentially the same
> > metric in the operator level IOMetricGroup. Similarly for the Sink
> > operator, the operator level RecordsOut/RecordsOutPerSecond metrics are
> > also reported by the Sink function. I marked them as existing in the
> > FLIP-33 wiki page. Please let me know if I misunderstood.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Thu, May 30, 2019 at 5:16 PM Becket Qin <becket....@gmail.com> wrote:
> >
> >> Hi Piotr,
> >>
> >> Thanks a lot for the feedback.
> >>
> >> 1a) I guess you are referring to the part that "original system specific
> >> metrics should also be reported". The performance impact depends on the
> >> implementation. An efficient implementation would only record the metric
> >> once, but report them with two different metric names. This is unlikely
> to
> >> hurt performance.
> >>
> >> 1b) Yes, I agree that we should avoid adding overhead to the critical
> path
> >> by all means. This is sometimes a tradeoff, running blindly without any
> >> metric gives best performance, but sometimes might be frustrating when
> we
> >> debug some issues.
> >>
> >> 2. The metrics are indeed very useful. Are they supposed to be reported
> by
> >> the connectors or Flink itself? At this point FLIP-33 is more focused on
> >> provide a guidance to the connector authors on the metrics reporting.
> That
> >> said, after FLIP-27, I think we should absolutely report these metrics
> in
> >> the abstract implementation. In any case, the metric convention in this
> >> list are expected to evolve over time.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski <pi...@ververica.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks for the proposal and driving the effort here Becket :) I’ve read
> >>> through the FLIP-33 [1], and here are couple of my thoughts.
> >>>
> >>> Big +1 for standardising the metric names between connectors, it will
> >>> definitely help us and users a lot.
> >>>
> >>> Issues/questions/things to discuss that I’ve thought of:
> >>>
> >>> 1a. If we are about to duplicate some metrics, can this become a
> >>> performance issue?
> >>> 1b. Generally speaking, we should make sure that collecting those
> metrics
> >>> is as non intrusive as possible, especially that they will need to be
> >>> updated once per record. (They might be collected more rarely with some
> >>> overhead, but the hot path of updating it per record will need to be as
> >>> quick as possible). That includes both avoiding heavy computation on
> per
> >>> record path: histograms?, measuring time for time based metrics (per
> >>> second) (System.currentTimeMillis() depending on the implementation can
> >>> invoke a system call)
> >>>
> >>> 2. It would be nice to have metrics, that allow us to check the cause
> of
> >>> back pressure:
> >>> a) for sources, length of input queue (in bytes? Or boolean
> >>> hasSomethingl/isEmpty)
> >>> b) for sinks, length of output queue (in bytes? Or boolean
> >>> hasSomething/isEmpty)
> >>>
> >>> a) is useful in a scenario when we are processing backlog of records,
> all
> >>> of the internal Flink’s input/output network buffers are empty, and we
> want
> >>> to check whether the external source system is the bottleneck (source’s
> >>> input queue will be empty), or if the Flink’s connector is the
> bottleneck
> >>> (source’s input queues will be full).
> >>> b) similar story. Backlog of records, but this time all of the internal
> >>> Flink’s input/ouput network buffers are full, and we want o check
> whether
> >>> the external sink system is the bottleneck (sink output queues are
> full),
> >>> or if the Flink’s connector is the bottleneck (sink’s output queues
> will be
> >>> empty)
> >>>
> >>> It might be sometimes difficult to provide those metrics, so they could
> >>> be optional, but if we could provide them, it would be really helpful.
> >>>
> >>> Piotrek
> >>>
> >>> [1]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
> >>> <
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
> >>>>
> >>>
> >>>> On 24 Apr 2019, at 13:28, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>> I think this sounds reasonable.
> >>>>
> >>>> Let's keep the "reconfiguration without stopping the job" out of this,
> >>>> because that would be a super big effort and if we approach that, then
> >>> in
> >>>> more generic way rather than specific to connector metrics.
> >>>>
> >>>> I would suggest to look at the following things before starting with
> any
> >>>> implementation work:
> >>>>
> >>>> - Try and find a committer to support this, otherwise it will be hard
> >>> to
> >>>> make progress
> >>>> - Start with defining a smaller set of "core metrics" and extend the
> >>> set
> >>>> later. I think that is easier than now blocking on reaching consensus
> >>> on a
> >>>> large group of metrics.
> >>>> - Find a solution to the problem Chesnay mentioned, that the "records
> >>> in"
> >>>> metric is somehow overloaded and exists already in the IO Metric
> group.
> >>>>
> >>>>
> >>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Stephan,
> >>>>>
> >>>>> Thanks a lot for the feedback. All makes sense.
> >>>>>
> >>>>> It is a good suggestion to simply have an onRecord(numBytes,
> eventTime)
> >>>>> method for connector writers. It should meet most of the
> requirements,
> >>>>> individual
> >>>>>
> >>>>> The configurable metrics feature is something really useful,
> >>> especially if
> >>>>> we can somehow make it dynamically configurable without stopping the
> >>> jobs.
> >>>>> It might be better to make it a separate discussion because it is a
> >>> more
> >>>>> generic feature instead of only for connectors.
> >>>>>
> >>>>> So in order to make some progress, in this FLIP we can limit the
> >>> discussion
> >>>>> scope to the connector related items:
> >>>>>
> >>>>> - the standard connector metric names and types.
> >>>>> - the abstract ConnectorMetricHandler interface
> >>>>>
> >>>>> I'll start a separate thread to discuss other general metric related
> >>>>> enhancement items including:
> >>>>>
> >>>>> - optional metrics
> >>>>> - dynamic metric configuration
> >>>>> - potential combination with rate limiter
> >>>>>
> >>>>> Does this plan sound reasonable?
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jiangjie (Becket) Qin
> >>>>>
> >>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen <se...@apache.org>
> wrote:
> >>>>>
> >>>>>> Ignoring for a moment implementation details, this connector metrics
> >>> work
> >>>>>> is a really good thing to do, in my opinion
> >>>>>>
> >>>>>> The questions "oh, my job seems to be doing nothing, I am looking at
> >>> the
> >>>>> UI
> >>>>>> and the 'records in' value is still zero" is in the top three
> support
> >>>>>> questions I have been asked personally.
> >>>>>> Introspection into "how far is the consumer lagging behind" (event
> >>> time
> >>>>>> fetch latency) came up many times as well.
> >>>>>>
> >>>>>> So big +1 to solving this problem.
> >>>>>>
> >>>>>> About the exact design - I would try to go for the following
> >>> properties:
> >>>>>>
> >>>>>> - keep complexity of of connectors. Ideally the metrics handler has
> a
> >>>>>> single onRecord(numBytes, eventTime) method or so, and everything
> >>> else is
> >>>>>> internal to the handler. That makes it dead simple for the
> connector.
> >>> We
> >>>>>> can also think of an extensive scheme for connector specific
> metrics.
> >>>>>>
> >>>>>> - make it configurable on the job it cluster level which metrics the
> >>>>>> handler internally creates when that method is invoked.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Stephan
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler <
> ches...@apache.org
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> As I said before, I believe this to be over-engineered and have no
> >>>>>>> interest in this implementation.
> >>>>>>>
> >>>>>>> There are conceptual issues like defining a duplicate
> >>>>> numBytesIn(PerSec)
> >>>>>>> metric that already exists for each operator.
> >>>>>>>
> >>>>>>> On 21.03.2019 06:13, Becket Qin wrote:
> >>>>>>>> A few updates to the thread. I uploaded a patch[1] as a complete
> >>>>>>>> example of how users can use the metrics in the future.
> >>>>>>>>
> >>>>>>>> Some thoughts below after taking a look at the AbstractMetricGroup
> >>>>> and
> >>>>>>>> its subclasses.
> >>>>>>>>
> >>>>>>>> This patch intends to provide convenience for Flink connector
> >>>>>>>> implementations to follow metrics standards proposed in FLIP-33.
> It
> >>>>>>>> also try to enhance the metric management in general way to help
> >>>>> users
> >>>>>>>> with:
> >>>>>>>>
> >>>>>>>> 1. metric definition
> >>>>>>>> 2. metric dependencies check
> >>>>>>>> 3. metric validation
> >>>>>>>> 4. metric control (turn on / off particular metrics)
> >>>>>>>>
> >>>>>>>> This patch wraps |MetricGroup| to extend the functionality of
> >>>>>>>> |AbstractMetricGroup| and its subclasses. The
> >>>>>>>> |AbstractMetricGroup| mainly focus on the metric group hierarchy,
> >>> but
> >>>>>>>> does not really manage the metrics other than keeping them in a
> Map.
> >>>>>>>>
> >>>>>>>> Ideally we should only have one entry point for the metrics.
> >>>>>>>>
> >>>>>>>> Right now the entry point is |AbstractMetricGroup|. However,
> besides
> >>>>>>>> the missing functionality mentioned above, |AbstractMetricGroup|
> >>>>> seems
> >>>>>>>> deeply rooted in Flink runtime. We could extract it out to
> >>>>>>>> flink-metrics in order to use it for generic purpose. There will
> be
> >>>>>>>> some work, though.
> >>>>>>>>
> >>>>>>>> Another approach is to make |AbstractMetrics| in this patch as the
> >>>>>>>> metric entry point. It wraps metric group and provides the missing
> >>>>>>>> functionalities. Then we can roll out this pattern to runtime
> >>>>>>>> components gradually as well.
> >>>>>>>>
> >>>>>>>> My first thought is that the latter approach gives a more smooth
> >>>>>>>> migration. But I am also OK with doing a refactoring on the
> >>>>>>>> |AbstractMetricGroup| family.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>
> >>>>>>>> [1] https://github.com/becketqin/flink/pull/1
> >>>>>>>>
> >>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin <becket....@gmail.com
> >>>>>>>> <mailto:becket....@gmail.com>> wrote:
> >>>>>>>>
> >>>>>>>>   Hi Chesnay,
> >>>>>>>>
> >>>>>>>>   It might be easier to discuss some implementation details in the
> >>>>>>>>   PR review instead of in the FLIP discussion thread. I have a
> >>>>> patch
> >>>>>>>>   for Kafka connectors ready but haven't submitted the PR yet.
> >>>>>>>>   Hopefully that will help explain a bit more.
> >>>>>>>>
> >>>>>>>>   ** Re: metric type binding
> >>>>>>>>   This is a valid point that worths discussing. If I understand
> >>>>>>>>   correctly, there are two points:
> >>>>>>>>
> >>>>>>>>   1. Metric type / interface does not matter as long as the metric
> >>>>>>>>   semantic is clearly defined.
> >>>>>>>>   Conceptually speaking, I agree that as long as the metric
> >>>>> semantic
> >>>>>>>>   is defined, metric type does not matter. To some extent, Gauge /
> >>>>>>>>   Counter / Meter / Histogram themselves can be think of as some
> >>>>>>>>   well-recognized semantics, if you wish. In Flink, these metric
> >>>>>>>>   semantics have their associated interface classes. In practice,
> >>>>>>>>   such semantic to interface binding seems necessary for different
> >>>>>>>>   components to communicate.  Simply standardize the semantic of
> >>>>> the
> >>>>>>>>   connector metrics seems not sufficient for people to build
> >>>>>>>>   ecosystem on top of. At the end of the day, we still need to
> have
> >>>>>>>>   some embodiment of the metric semantics that people can program
> >>>>>>>>   against.
> >>>>>>>>
> >>>>>>>>   2. Sometimes the same metric semantic can be exposed using
> >>>>>>>>   different metric types / interfaces.
> >>>>>>>>   This is a good point. Counter and Gauge-as-a-Counter are pretty
> >>>>>>>>   much interchangeable. This is more of a trade-off between the
> >>>>> user
> >>>>>>>>   experience of metric producers and consumers. The metric
> >>>>> producers
> >>>>>>>>   want to use Counter or Gauge depending on whether the counter is
> >>>>>>>>   already tracked in code, while ideally the metric consumers only
> >>>>>>>>   want to see a single metric type for each metric. I am leaning
> >>>>>>>>   towards to make the metric producers happy, i.e. allow Gauge /
> >>>>>>>>   Counter metric type, and the the metric consumers handle the
> type
> >>>>>>>>   variation. The reason is that in practice, there might be more
> >>>>>>>>   connector implementations than metric reporter implementations.
> >>>>> We
> >>>>>>>>   could also provide some helper method to facilitate reading from
> >>>>>>>>   such variable metric type.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>   Just some quick replies to the comments around implementation
> >>>>>>> details.
> >>>>>>>>
> >>>>>>>>       4) single place where metrics are registered except
> >>>>>>>>       connector-specific
> >>>>>>>>       ones (which we can't really avoid).
> >>>>>>>>
> >>>>>>>>   Register connector specific ones in a single place is actually
> >>>>>>>>   something that I want to achieve.
> >>>>>>>>
> >>>>>>>>       2) I'm talking about time-series databases like Prometheus.
> >>>>> We
> >>>>>>>>       would
> >>>>>>>>       only have a gauge metric exposing the last
> fetchTime/emitTime
> >>>>>>>>       that is
> >>>>>>>>       regularly reported to the backend (Prometheus), where a user
> >>>>>>>>       could build
> >>>>>>>>       a histogram of his choosing when/if he wants it.
> >>>>>>>>
> >>>>>>>>   Not sure if such downsampling works. As an example, if a user
> >>>>>>>>   complains that there are some intermittent latency spikes (maybe
> >>>>> a
> >>>>>>>>   few records in 10 seconds) in their processing system. Having a
> >>>>>>>>   Gauge sampling instantaneous latency seems unlikely useful.
> >>>>>>>>   However by looking at actual 99.9 percentile latency might help.
> >>>>>>>>
> >>>>>>>>   Thanks,
> >>>>>>>>
> >>>>>>>>   Jiangjie (Becket) Qin
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>   On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler
> >>>>>>>>   <ches...@apache.org <mailto:ches...@apache.org>> wrote:
> >>>>>>>>
> >>>>>>>>       Re: over complication of implementation.
> >>>>>>>>
> >>>>>>>>       I think I get understand better know what you're shooting
> >>>>> for,
> >>>>>>>>       effectively something like the OperatorIOMetricGroup.
> >>>>>>>>       But still, re-define setupConnectorMetrics() to accept a set
> >>>>>>>>       of flags
> >>>>>>>>       for counters/meters(ans _possibly_ histograms) along with a
> >>>>>>>>       set of
> >>>>>>>>       well-defined Optional<Gauge<?>>, and return the group.
> >>>>>>>>
> >>>>>>>>       Solves all issues as far as i can tell:
> >>>>>>>>       1) no metrics must be created manually (except Gauges, which
> >>>>>> are
> >>>>>>>>       effectively just Suppliers and you can't get around this),
> >>>>>>>>       2) additional metrics can be registered on the returned
> >>>>> group,
> >>>>>>>>       3) see 1),
> >>>>>>>>       4) single place where metrics are registered except
> >>>>>>>>       connector-specific
> >>>>>>>>       ones (which we can't really avoid).
> >>>>>>>>
> >>>>>>>>       Re: Histogram
> >>>>>>>>
> >>>>>>>>       1) As an example, whether "numRecordsIn" is exposed as a
> >>>>>>>>       Counter or a
> >>>>>>>>       Gauge should be irrelevant. So far we're using the metric
> >>>>> type
> >>>>>>>>       that is
> >>>>>>>>       the most convenient at exposing a given value. If there is
> >>>>>>>>       some backing
> >>>>>>>>       data-structure that we want to expose some data from we
> >>>>>>>>       typically opt
> >>>>>>>>       for a Gauge, as otherwise we're just mucking around with the
> >>>>>>>>       Meter/Counter API to get it to match. Similarly, if we want
> >>>>> to
> >>>>>>>>       count
> >>>>>>>>       something but no current count exists we typically added a
> >>>>>>>>       Counter.
> >>>>>>>>       That's why attaching semantics to metric types makes little
> >>>>>>>>       sense (but
> >>>>>>>>       unfortunately several reporters already do it); for
> >>>>>>>>       counters/meters
> >>>>>>>>       certainly, but the majority of metrics are gauges.
> >>>>>>>>
> >>>>>>>>       2) I'm talking about time-series databases like Prometheus.
> >>>>> We
> >>>>>>>>       would
> >>>>>>>>       only have a gauge metric exposing the last
> fetchTime/emitTime
> >>>>>>>>       that is
> >>>>>>>>       regularly reported to the backend (Prometheus), where a user
> >>>>>>>>       could build
> >>>>>>>>       a histogram of his choosing when/if he wants it.
> >>>>>>>>
> >>>>>>>>       On 22.02.2019 13:57, Becket Qin wrote:
> >>>>>>>>> Hi Chesnay,
> >>>>>>>>>
> >>>>>>>>> Thanks for the explanation.
> >>>>>>>>>
> >>>>>>>>> ** Re: FLIP
> >>>>>>>>> I might have misunderstood this, but it seems that "major
> >>>>>>>>       changes" are well
> >>>>>>>>> defined in FLIP. The full contents is following:
> >>>>>>>>> What is considered a "major change" that needs a FLIP?
> >>>>>>>>>
> >>>>>>>>> Any of the following should be considered a major change:
> >>>>>>>>>
> >>>>>>>>>   - Any major new feature, subsystem, or piece of
> >>>>>>>>       functionality
> >>>>>>>>>   - *Any change that impacts the public interfaces of the
> >>>>>>>>       project*
> >>>>>>>>>
> >>>>>>>>> What are the "public interfaces" of the project?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *All of the following are public interfaces *that people
> >>>>>>>>       build around:
> >>>>>>>>>
> >>>>>>>>>   - DataStream and DataSet API, including classes related
> >>>>>>>>       to that, such as
> >>>>>>>>>   StreamExecutionEnvironment
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - Classes marked with the @Public annotation
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - On-disk binary formats, such as
> >>>>> checkpoints/savepoints
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - User-facing scripts/command-line tools, i.e.
> >>>>>>>>       bin/flink, Yarn scripts,
> >>>>>>>>>   Mesos scripts
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - Configuration settings
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - *Exposed monitoring information*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> So any monitoring information change is considered as
> >>>>> public
> >>>>>>>>       interface, and
> >>>>>>>>> any public interface change is considered as a "major
> >>>>>> change".
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> ** Re: over complication of implementation.
> >>>>>>>>>
> >>>>>>>>> Although this is more of implementation details that is not
> >>>>>>>>       covered by the
> >>>>>>>>> FLIP. But it may be worth discussing.
> >>>>>>>>>
> >>>>>>>>> First of all, I completely agree that we should use the
> >>>>>>>>       simplest way to
> >>>>>>>>> achieve our goal. To me the goal is the following:
> >>>>>>>>> 1. Clear connector conventions and interfaces.
> >>>>>>>>> 2. The easiness of creating a connector.
> >>>>>>>>>
> >>>>>>>>> Both of them are important to the prosperity of the
> >>>>>>>>       connector ecosystem. So
> >>>>>>>>> I'd rather abstract as much as possible on our side to make
> >>>>>>>>       the connector
> >>>>>>>>> developer's work lighter. Given this goal, a static util
> >>>>>>>>       method approach
> >>>>>>>>> might have a few drawbacks:
> >>>>>>>>> 1. Users still have to construct the metrics by themselves.
> >>>>>>>>       (And note that
> >>>>>>>>> this might be erroneous by itself. For example, a customer
> >>>>>>>>       wrapper around
> >>>>>>>>> dropwizard meter maybe used instead of MeterView).
> >>>>>>>>> 2. When connector specific metrics are added, it is
> >>>>>>>>       difficult to enforce
> >>>>>>>>> the scope to be the same as standard metrics.
> >>>>>>>>> 3. It seems that a method proliferation is inevitable if we
> >>>>>>>>       want to apply
> >>>>>>>>> sanity checks. e.g. The metric of numBytesIn was not
> >>>>>>>>       registered for a meter.
> >>>>>>>>> 4. Metrics are still defined in random places and hard to
> >>>>>>> track.
> >>>>>>>>>
> >>>>>>>>> The current PR I had was inspired by the Config system in
> >>>>>>>>       Kafka, which I
> >>>>>>>>> found pretty handy. In fact it is not only used by Kafka
> >>>>>>>>       itself but even
> >>>>>>>>> some other projects that depend on Kafka. I am not saying
> >>>>>>>>       this approach is
> >>>>>>>>> perfect. But I think it worths to save the work for
> >>>>>>>>       connector writers and
> >>>>>>>>> encourage more systematic implementation. That being said,
> >>>>> I
> >>>>>>>>       am fully open
> >>>>>>>>> to suggestions.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Re: Histogram
> >>>>>>>>> I think there are two orthogonal questions around those
> >>>>>>> metrics:
> >>>>>>>>>
> >>>>>>>>> 1. Regardless of the metric type, by just looking at the
> >>>>>>>>       meaning of a
> >>>>>>>>> metric, is generic to all connectors? If the answer is yes,
> >>>>>>>>       we should
> >>>>>>>>> include the metric into the convention. No matter whether
> >>>>> we
> >>>>>>>>       include it
> >>>>>>>>> into the convention or not, some connector implementations
> >>>>>>>>       will emit such
> >>>>>>>>> metric. It is better to have a convention than letting each
> >>>>>>>>       connector do
> >>>>>>>>> random things.
> >>>>>>>>>
> >>>>>>>>> 2. If a standard metric is a histogram, what should we do?
> >>>>>>>>> I agree that we should make it clear that using histograms
> >>>>>>>>       will have
> >>>>>>>>> performance risk. But I do see histogram is useful in some
> >>>>>>>>       fine-granularity
> >>>>>>>>> debugging where one do not have the luxury to stop the
> >>>>>>>>       system and inject
> >>>>>>>>> more inspection code. So the workaround I am thinking is to
> >>>>>>>>       provide some
> >>>>>>>>> implementation suggestions. Assume later on we have a
> >>>>>>>>       mechanism of
> >>>>>>>>> selective metrics. In the abstract metrics class we can
> >>>>>>>>       disable those
> >>>>>>>>> metrics by default individual connector writers does not
> >>>>>>>>       have to do
> >>>>>>>>> anything (this is another advantage of having an
> >>>>>>>>       AbstractMetrics instead of
> >>>>>>>>> static util methods.)
> >>>>>>>>>
> >>>>>>>>> I am not sure I fully understand the histogram in the
> >>>>>>>>       backend approach. Can
> >>>>>>>>> you explain a bit more? Do you mean emitting the raw data,
> >>>>>>>>       e.g. fetchTime
> >>>>>>>>> and emitTime with each record and let the histogram
> >>>>>>>>       computation happen in
> >>>>>>>>> the background? Or let the processing thread putting the
> >>>>>>>>       values into a
> >>>>>>>>> queue and have a separate thread polling from the queue and
> >>>>>>>>       add them into
> >>>>>>>>> the histogram?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay Schepler
> >>>>>>>>       <ches...@apache.org <mailto:ches...@apache.org>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Re: Flip
> >>>>>>>>>> The very first line under both the main header and Purpose
> >>>>>>>>       section
> >>>>>>>>>> describe Flips as "major changes", which this isn't.
> >>>>>>>>>>
> >>>>>>>>>> Re: complication
> >>>>>>>>>> I'm not arguing against standardization, but again an
> >>>>>>>>       over-complicated
> >>>>>>>>>> implementation when a static utility method would be
> >>>>>>>>       sufficient.
> >>>>>>>>>>
> >>>>>>>>>> public static void setupConnectorMetrics(
> >>>>>>>>>> MetricGroup operatorMetricGroup,
> >>>>>>>>>> String connectorName,
> >>>>>>>>>> Optional<Gauge<Long>> numRecordsIn,
> >>>>>>>>>> ...)
> >>>>>>>>>>
> >>>>>>>>>> This gives you all you need:
> >>>>>>>>>> * a well-defined set of metrics for a connector to opt-in
> >>>>>>>>>> * standardized naming schemes for scope and individual
> >>>>>> metrics
> >>>>>>>>>> * standardize metric types (although personally I'm not
> >>>>>>>>       interested in that
> >>>>>>>>>> since metric types should be considered syntactic sugar)
> >>>>>>>>>>
> >>>>>>>>>> Re: Configurable Histogram
> >>>>>>>>>> If anything they _must_ be turned off by default, but the
> >>>>>>>>       metric system is
> >>>>>>>>>> already exposing so many options that I'm not too keen on
> >>>>>>>>       adding even more.
> >>>>>>>>>> You have also only addressed my first argument against
> >>>>>>>>       histograms
> >>>>>>>>>> (performance), the second one still stands (calculate
> >>>>>>>>       histogram in metric
> >>>>>>>>>> backends instead).
> >>>>>>>>>>
> >>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote:
> >>>>>>>>>>> Hi Chesnay,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the comments. I think this is worthy of a FLIP
> >>>>>>>>       because it is
> >>>>>>>>>>> public API. According to the FLIP description a FlIP is
> >>>>>>>>       required in case
> >>>>>>>>>> of:
> >>>>>>>>>>>    - Any change that impacts the public interfaces of
> >>>>>>>>       the project
> >>>>>>>>>>>
> >>>>>>>>>>> and the following entry is found in the definition of
> >>>>>>>>       "public interface".
> >>>>>>>>>>>
> >>>>>>>>>>>    - Exposed monitoring information
> >>>>>>>>>>>
> >>>>>>>>>>> Metrics are critical to any production system. So a clear
> >>>>>>>>       metric
> >>>>>>>>>> definition
> >>>>>>>>>>> is important for any serious users. For an organization
> >>>>>>>>       with large Flink
> >>>>>>>>>>> installation, change in metrics means great amount of
> >>>>>>>>       work. So such
> >>>>>>>>>> changes
> >>>>>>>>>>> do need to be fully discussed and documented.
> >>>>>>>>>>>
> >>>>>>>>>>> ** Re: Histogram.
> >>>>>>>>>>> We can discuss whether there is a better way to expose
> >>>>>>>>       metrics that are
> >>>>>>>>>>> suitable for histograms. My micro-benchmark on various
> >>>>>>>>       histogram
> >>>>>>>>>>> implementations also indicates that they are
> >>>>> significantly
> >>>>>>>>       slower than
> >>>>>>>>>>> other metric types. But I don't think that means never
> >>>>> use
> >>>>>>>>       histogram, but
> >>>>>>>>>>> means use it with caution. For example, we can suggest
> >>>>> the
> >>>>>>>>>> implementations
> >>>>>>>>>>> to turn them off by default and only turn it on for a
> >>>>>>>>       small amount of
> >>>>>>>>>> time
> >>>>>>>>>>> when performing some micro-debugging.
> >>>>>>>>>>>
> >>>>>>>>>>> ** Re: complication:
> >>>>>>>>>>> Connector conventions are essential for Flink ecosystem.
> >>>>>>>>       Flink connectors
> >>>>>>>>>>> pool is probably the most important part of Flink, just
> >>>>>>>>       like any other
> >>>>>>>>>> data
> >>>>>>>>>>> system. Clear conventions of connectors will help build
> >>>>>>>>       Flink ecosystem
> >>>>>>>>>> in
> >>>>>>>>>>> a more organic way.
> >>>>>>>>>>> Take the metrics convention as an example, imagine
> >>>>> someone
> >>>>>>>>       has developed
> >>>>>>>>>> a
> >>>>>>>>>>> Flink connector for System foo, and another developer may
> >>>>>>>>       have developed
> >>>>>>>>>> a
> >>>>>>>>>>> monitoring and diagnostic framework for Flink which
> >>>>>>>>       analyzes the Flink
> >>>>>>>>>> job
> >>>>>>>>>>> performance based on metrics. With a clear metric
> >>>>>>>>       convention, those two
> >>>>>>>>>>> projects could be developed independently. Once users put
> >>>>>>>>       them together,
> >>>>>>>>>>> it would work without additional modifications. This
> >>>>>>>>       cannot be easily
> >>>>>>>>>>> achieved by just defining a few constants.
> >>>>>>>>>>>
> >>>>>>>>>>> ** Re: selective metrics:
> >>>>>>>>>>> Sure, we can discuss that in a separate thread.
> >>>>>>>>>>>
> >>>>>>>>>>> @Dawid
> >>>>>>>>>>>
> >>>>>>>>>>> ** Re: latency / fetchedLatency
> >>>>>>>>>>> The primary purpose of establish such a convention is to
> >>>>>>>>       help developers
> >>>>>>>>>>> write connectors in a more compatible way. The convention
> >>>>>>>>       is supposed to
> >>>>>>>>>> be
> >>>>>>>>>>> defined more proactively. So when look at the convention,
> >>>>>>>>       it seems more
> >>>>>>>>>>> important to see if the concept is applicable to
> >>>>>>>>       connectors in general.
> >>>>>>>>>> It
> >>>>>>>>>>> might be true so far only Kafka connector reports
> >>>>> latency.
> >>>>>>>>       But there
> >>>>>>>>>> might
> >>>>>>>>>>> be hundreds of other connector implementations in the
> >>>>>>>>       Flink ecosystem,
> >>>>>>>>>>> though not in the Flink repo, and some of them also emits
> >>>>>>>>       latency. I
> >>>>>>>>>> think
> >>>>>>>>>>> a lot of other sources actually also has an append
> >>>>>>>>       timestamp. e.g.
> >>>>>>>>>> database
> >>>>>>>>>>> bin logs and some K-V stores. So I wouldn't be surprised
> >>>>>>>>       if some database
> >>>>>>>>>>> connector can also emit latency metrics.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay Schepler
> >>>>>>>>       <ches...@apache.org <mailto:ches...@apache.org>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Regarding 2) It doesn't make sense to investigate this
> >>>>> as
> >>>>>>>>       part of this
> >>>>>>>>>>>> FLIP. This is something that could be of interest for
> >>>>> the
> >>>>>>>>       entire metric
> >>>>>>>>>>>> system, and should be designed for as such.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding the proposal as a whole:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Histogram metrics shall not be added to the core of
> >>>>>>>>       Flink. They are
> >>>>>>>>>>>> significantly more expensive than other metrics, and
> >>>>>>>>       calculating
> >>>>>>>>>>>> histograms in the application is regarded as an
> >>>>>>>>       anti-pattern by several
> >>>>>>>>>>>> metric backends, who instead recommend to expose the raw
> >>>>>>>>       data and
> >>>>>>>>>>>> calculate the histogram in the backend.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Second, this seems overly complicated. Given that we
> >>>>>>>>       already established
> >>>>>>>>>>>> that not all connectors will export all metrics we are
> >>>>>>>>       effectively
> >>>>>>>>>>>> reducing this down to a consistent naming scheme. We
> >>>>>>>>       don't need anything
> >>>>>>>>>>>> sophisticated for that; basically just a few constants
> >>>>>>>>       that all
> >>>>>>>>>>>> connectors use.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm not convinced that this is worthy of a FLIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote:
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Ad 1. In general I undestand and I agree. But those
> >>>>>>>>       particular metrics
> >>>>>>>>>>>>> (latency, fetchLatency), right now would only be
> >>>>>>>>       reported if user uses
> >>>>>>>>>>>>> KafkaConsumer with internal timestampAssigner with
> >>>>>>>>       StreamCharacteristic
> >>>>>>>>>>>>> set to EventTime, right? That sounds like a very
> >>>>>>>>       specific case. I am
> >>>>>>>>>> not
> >>>>>>>>>>>>> sure if we should introduce a generic metric that will
> >>>>> be
> >>>>>>>>>>>>> disabled/absent for most of implementations.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, that might
> >>>>>>>>       make sense to
> >>>>>>>>>>>>> investigate in the future.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote:
> >>>>>>>>>>>>>> Hi Dawid,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the feedback. That makes sense to me. There
> >>>>>>>>       are two cases
> >>>>>>>>>> to
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> addressed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. The metrics are supposed to be a guidance. It is
> >>>>>>>>       likely that a
> >>>>>>>>>>>> connector
> >>>>>>>>>>>>>> only supports some but not all of the metrics. In that
> >>>>>>>>       case, each
> >>>>>>>>>>>> connector
> >>>>>>>>>>>>>> implementation should have the freedom to decide which
> >>>>>>>>       metrics are
> >>>>>>>>>>>>>> reported. For the metrics that are supported, the
> >>>>>>>>       guidance should be
> >>>>>>>>>>>>>> followed.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. Sometimes users may want to disable certain metrics
> >>>>>>>>       for some reason
> >>>>>>>>>>>>>> (e.g. performance / reprocessing of data). A generic
> >>>>>>>>       mechanism should
> >>>>>>>>>> be
> >>>>>>>>>>>>>> provided to allow user choose which metrics are
> >>>>>>>>       reported. This
> >>>>>>>>>> mechanism
> >>>>>>>>>>>>>> should also be honored by the connector
> >>>>> implementations.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Does this sound reasonable to you?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid Wysakowicz <
> >>>>>>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Generally I like the idea of having a unified,
> >>>>>>>>       standard set of
> >>>>>>>>>> metrics
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> all connectors. I have some slight concerns about
> >>>>>>>>       fetchLatency and
> >>>>>>>>>>>>>>> latency though. They are computed based on EventTime
> >>>>>>>>       which is not a
> >>>>>>>>>>>> purely
> >>>>>>>>>>>>>>> technical feature. It depends often on some business
> >>>>>>>>       logic, might be
> >>>>>>>>>>>> absent
> >>>>>>>>>>>>>>> or defined after source. Those metrics could also
> >>>>>>>>       behave in a weird
> >>>>>>>>>>>> way in
> >>>>>>>>>>>>>>> case of replaying backlog. Therefore I am not sure if
> >>>>>>>>       we should
> >>>>>>>>>> include
> >>>>>>>>>>>>>>> those metrics by default. Maybe we could at least
> >>>>>>>>       introduce a feature
> >>>>>>>>>>>>>>> switch for them? What do you think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Bump. If there is no objections to the proposed
> >>>>>>>>       metrics. I'll start a
> >>>>>>>>>>>>>>> voting thread later toady.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket Qin
> >>>>>>>>       <becket....@gmail.com <mailto:becket....@gmail.com>> <
> >>>>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>>
> >>>>>> wrote:
> >>>>>>>>>>>>>>> Hi folks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I would like to start the FLIP discussion thread
> >>>>> about
> >>>>>>>>       standardize
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> connector metrics.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In short, we would like to provide a convention of
> >>>>>>>>       Flink connector
> >>>>>>>>>>>>>>> metrics. It will help simplify the monitoring and
> >>>>>>>>       alerting on Flink
> >>>>>>>>>>>> jobs.
> >>>>>>>>>>>>>>> The FLIP link is following:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
>
>
>

Reply via email to