Hi Arvid,

I think it is OK to leave eventTimeFetchLag out of the scope of this FLIP
given that it may involve additional API changes.

5. RecordMetadata is currently not simplifying any code. By the current
> design RecordMetadata is a read-only data structure that is constant for
> all records in a batch. So in Kafka, we still need to pass Tuple3 because
> offset and timestamp are per record.

Does this depend on whether we will get the RecordMetadata per record or
per batch? We can make the semantic of RecordsWithSplitIds.metadata() to be
the metadata associated with the last record returned by
RecordsWithSplitIds.nextRecordsFromSplit(). In this case, individual
implementations can decide whether to return different metadata for each
record or not. In case of Kafka, the Tuple3 can be replaced with three
lists of records, timestamps and offsets respectively. It probably saves
some object instantiation, assuming the RecordMetadata object itself can be
reused.

6. We might rename and change the semantics into

public interface RecordsWithSplitIds<E> {
>     /**
>      * Returns the record metadata. The metadata is shared for all
> records in the current split.
>      */
>     @Nullable
>     default RecordMetadata metadataOfCurrentSplit() {
>         return null;
>     }
> ...
> }

Maybe we can move one step further to make it "metadataOfCurrentRecord()"
as I mentioned above.

Thanks,

Jiangjie (Becket) QIn

On Fri, Jul 30, 2021 at 3:00 PM Arvid Heise <ar...@apache.org> wrote:

> Hi folks,
>
> To move on with the FLIP, I will cut out eventTimeFetchLag out of scope and
> go ahead with the remainder.
>
> I will open a VOTE later to today.
>
> Best,
>
> Arvid
>
> On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi Becket,
> >
> > I have updated the PR according to your suggestion (note that this commit
> > contains the removal of the previous approach) [1]. Here are my
> > observations:
> > 1. Adding the type of RecordMetadata to emitRecord would require adding
> > another type parameter to RecordEmitter and SourceReaderBase. So I left
> > that out for now as it would break things completely.
> > 2. RecordEmitter implementations that want to pass it to SourceOutput
> need
> > to be changed in a boilerplate fashion. (passing the metadata to the
> > SourceOutput)
> > 3. RecordMetadata as an interface (as in the commit) probably requires
> > boilerplate implementations in using sources as well.
> > 4. SourceOutput would also require an additional collect
> >
> > default void collect(T record, RecordMetadata metadata) {
> >     collect(record, TimestampAssigner.NO_TIMESTAMP, metadata);
> > }
> >
> > 5. RecordMetadata is currently not simplifying any code. By the current
> > design RecordMetadata is a read-only data structure that is constant for
> > all records in a batch. So in Kafka, we still need to pass Tuple3 because
> > offset and timestamp are per record.
> > 6. RecordMetadata is currently the same for all splits in
> > RecordsWithSplitIds.
> >
> > Some ideas for the above points:
> > 3. We should accompy it with a default implementation to avoid the
> trivial
> > POJO implementations as the KafkaRecordMetadata of my commit. Can we skip
> > the interface and just have RecordMetadata as a base class?
> > 1.,2.,4. We could also set the metadata only once in an orthogonal method
> > that need to be called before collect like
> SourceOutput#setRecordMetadata.
> > Then we can implement it entirely in SourceReaderBase without changing
> any
> > code. The clear downside is that it introduces some implicit state in
> > SourceOutput (which we implement) and is harder to use in
> > non-SourceReaderBase classes: Source devs need to remember to call
> > setRecordMetadata before collect for a respective record.
> > 6. We might rename and change the semantics into
> >
> > public interface RecordsWithSplitIds<E> {
> >     /**
> >      * Returns the record metadata. The metadata is shared for all
> records in the current split.
> >      */
> >     @Nullable
> >     default RecordMetadata metadataOfCurrentSplit() {
> >         return null;
> >     }
> > ...
> > }
> >
> >
> > Re global variable
> >
> >> To explain a bit more on the metric being a global variable, I think in
> >> general there are two ways to pass a value from one code block to
> another.
> >> The first way is direct passing. That means the variable is explicitly
> >> passed from one code block to another via arguments, be them in the
> >> constructor or methods. Another way is indirect passing through context,
> >> that means the information is stored in some kind of context or
> >> environment, and everyone can have access to it. And there is no
> explicit
> >> value passing from one code block to another because everyone just reads
> >> from/writes to the context or environment. This is basically the "global
> >> variable" pattern I am talking about.
> >>
> >> In general people would avoid having a mutable global value shared
> across
> >> code blocks, because it is usually less deterministic and therefore more
> >> difficult to understand or debug.
> >>
> > Since the first approach was using a Gauge, it's a callback and not a
> > global value. The actual value is passed when invoking the callback. It's
> > the same as a supplier. However, the gauge itself is stored in the
> context,
> > so your argument holds on that level.
> >
> >
> >> Moreover, generally speaking, the Metrics in systems are usually
> perceived
> >> as a reporting mechanism. People usually think of it as a way to expose
> >> some internal values to the external system, and don't expect the
> program
> >> itself to read the reported values again in the main logic, which is
> >> essentially using the MetricGroup as a context to pass values across
> code
> >> block, i.e. the "global variable" pattern. Instead, people would usually
> >> use the "direct passing" to do this.
> >>
> > Here I still don't see a difference on how we calculate the meter values
> > from the byteIn/Out counters. We also need to read the counters
> > periodically and calculate a secondary metric. So it can't be that
> > unexpected to users.
> >
> > [1]
> >
> https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b
> >
> > On Tue, Jul 27, 2021 at 3:19 AM Becket Qin <becket....@gmail.com> wrote:
> >
> >> Hi Arvid,
> >>
> >> Thanks for the patient discussion.
> >>
> >> To explain a bit more on the metric being a global variable, I think in
> >> general there are two ways to pass a value from one code block to
> another.
> >> The first way is direct passing. That means the variable is explicitly
> >> passed from one code block to another via arguments, be them in the
> >> constructor or methods. Another way is indirect passing through context,
> >> that means the information is stored in some kind of context or
> >> environment, and everyone can have access to it. And there is no
> explicit
> >> value passing from one code block to another because everyone just reads
> >> from/writes to the context or environment. This is basically the "global
> >> variable" pattern I am talking about.
> >>
> >> In general people would avoid having a mutable global value shared
> across
> >> code blocks, because it is usually less deterministic and therefore more
> >> difficult to understand or debug.
> >>
> >> Moreover, generally speaking, the Metrics in systems are usually
> perceived
> >> as a reporting mechanism. People usually think of it as a way to expose
> >> some internal values to the external system, and don't expect the
> program
> >> itself to read the reported values again in the main logic, which is
> >> essentially using the MetricGroup as a context to pass values across
> code
> >> block, i.e. the "global variable" pattern. Instead, people would usually
> >> use the "direct passing" to do this.
> >>
> >> >Can we think of other use cases for the fetchTime parameter beyond
> >> metrics
> >> in the future? If so, it would make an even stronger case.
> >> At this point, I cannot think of other use cases for fetchTime, but I
> can
> >> see use cases where people want to get a per split fetch lag. So I am
> >> wondering if it makes sense to generalize the API a little bit by
> >> introducing collect(T Record, Long timestamp, Metadata metadata). This
> >> also
> >> makes a natural alignment because the RecordsWithSplitIds also returns a
> >> Metadata associated with the record, which can be used by RecordEmitter
> as
> >> well as the SourceOutput.
> >>
> >> What do you think?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise <ar...@apache.org> wrote:
> >>
> >> > Hi Becket,
> >> >
> >> > I still can't follow your view on the metric being a global variable
> or
> >> > your concern that it is confusing to users. Nevertheless, I like your
> >> > proposal with having an additional collect method.
> >> >
> >> > I was thinking that
> >> > > SourceOutput is going to have an additional method of collect(T
> >> Record,
> >> > > Long timestamp, Long fetchTime). So people can just pass in the
> fetch
> >> > time
> >> > > directly when they emit a record, regardless of using
> >> SourceReaderBase or
> >> > > not.
> >> > >
> >> >
> >> > Can we think of other use cases for the fetchTime parameter beyond
> >> metrics
> >> > in the future? If so, it would make an even stronger case.
> >> >
> >> > I'll update the PR with your proposals.
> >> >
> >> > Best,
> >> >
> >> > Arvid
> >> >
> >> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin <becket....@gmail.com>
> >> wrote:
> >> >
> >> > > Regarding the generic type v.s. class/subclasses of Metadata.
> >> > >
> >> > > I think generic types usually make sense if the framework/abstract
> >> class
> >> > > itself does not look into the instances, but just pass them from one
> >> user
> >> > > logic to another. Otherwise, interfaces or class/subclasses would be
> >> > > preferred.
> >> > >
> >> > > In our case, it depends on whether we expect the SourceReaderBase to
> >> look
> >> > > into the MetaData. At this point, it does not. But it seems possible
> >> that
> >> > > in the future it may look into MetaData. Therefore I think the
> class /
> >> > > subclass pattern would be better.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > >
> >> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin <becket....@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hi Arvid,
> >> > > >
> >> > > > > I'm not sure if I follow the global variable argument, could you
> >> > > > elaborate? Are you referring specifically to the SettableGauge?
> How
> >> is
> >> > > that
> >> > > > different from a Counter or Meter?
> >> > > > What I meant is that the fetch lag computing logic can either get
> >> the
> >> > > > information required from method argument or something like a
> static
> >> > > global
> >> > > > variable. We are essentially trying to reuse the metric as a
> static
> >> > > global
> >> > > > variable. It seems not a common pattern in most systems. It is a
> >> little
> >> > > > counterintuitive that a gauge reported to the metric system would
> be
> >> > used
> >> > > > by the program main logic later on as a variable.
> >> > > >
> >> > > > > We could do that. That would remove the gauge from the
> >> MetricGroup,
> >> > > > right? The main downside is that sources that do not use
> >> > SourceReaderBase
> >> > > > cannot set the metric anymore. So I'd rather keep the current way
> >> and
> >> > > > extend it with the metadata extension.
> >> > > > Yes, that would remove the gauge from the MetricGroup. I was
> >> thinking
> >> > > that
> >> > > > SourceOutput is going to have an additional method of collect(T
> >> Record,
> >> > > > Long timestamp, Long fetchTime). So people can just pass in the
> >> fetch
> >> > > time
> >> > > > directly when they emit a record, regardless of using
> >> SourceReaderBase
> >> > or
> >> > > > not.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jiangjie (Becket) Qin
> >> > > >
> >> > > > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler <
> >> ches...@apache.org>
> >> > > > wrote:
> >> > > >
> >> > > >> The only histogram implementation available to use are those by
> >> > > >> dropwizard, and they do some lock-free synchronization stuff that
> >> so
> >> > > far we
> >> > > >> wanted to keep out of hot paths (this applis to both reading and
> >> > > writing);
> >> > > >> we have however never made benchmarks.
> >> > > >> But it is reasonable to assume that they are way more expensive
> >> than
> >> > the
> >> > > >> alternatives (in the ideal case just being a getter).
> >> > > >> You'd pay that cost irrespective of whether a reporter is enabled
> >> or
> >> > > not,
> >> > > >> which is another thing we so far wanted to prevent.
> >> > > >> Finally, histograms are problematic because they are 10x more
> >> > expensive
> >> > > >> on the metric backend (because they are effectively 10 metrics),
> >> and
> >> > > should
> >> > > >> be used with extreme caution, in particular because we lack any
> >> switch
> >> > > to
> >> > > >> disable/enable metrics (and I think we are getting to a point
> where
> >> > the
> >> > > >> metric system becomes unusable for heavy users because of that,
> >> where
> >> > > >> another histogram isn't helping).
> >> > > >>
> >> > > >> Overall, at this time I'm against using Histograms.
> >> > > >> Furthermore, I believe that we should be able to derive a
> Histogram
> >> > from
> >> > > >> that supplier if we later one decide differently. We'd just need
> to
> >> > poll
> >> > > >> the supplier more often.
> >> > > >>
> >> > > >> On 22/07/2021 09:36, Arvid Heise wrote:
> >> > > >>
> >> > > >> Hi all,
> >> > > >>
> >> > > >> @Steven Wu <stevenz...@gmail.com>
> >> > > >>
> >> > > >>> Regarding "lastFetchTime" latency metric, I found Gauge to be
> less
> >> > > >>> informative as it only captures the last sampling value for each
> >> > metric
> >> > > >>> publish interval (e.g. 60s).
> >> > > >>> * Can we make it a histogram? Histograms are more expensive
> >> though.
> >> > > >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg,
> count.
> >> but
> >> > > >>> there
> >> > > >>> is no such metric type in Flink
> >> > > >>> * Summary metric type [3] (from Prometheus) would be nice too
> >> > > >>>
> >> > > >> I'd also think that a histogram is much more expressive but the
> >> > original
> >> > > >> FLIP-33 decided against it because of it's cost. @Chesnay
> Schepler
> >> > > >> <ches...@apache.org> could you shed some light on how much more
> >> > > >> expensive it is in comparison to a simple gauge? Does it depend
> on
> >> > > whether
> >> > > >> a reporter is actually using the metric?
> >> > > >> The current interface of this FLIP-179 would actually allow to
> >> switch
> >> > > the
> >> > > >> type of the metric later. But since the metric type is
> >> user-facing, we
> >> > > need
> >> > > >> to have an agreement now.
> >> > > >>
> >> > > >> @Becket Qin <becket....@gmail.com>
> >> > > >>
> >> > > >>> In that case, do we still need the metric here? It seems we are
> >> > > creating
> >> > > >>> a
> >> > > >>> "global variable" which users may potentially use. I am
> wondering
> >> how
> >> > > >>> much
> >> > > >>> additional convenience it provides because it seems easy for
> >> people
> >> > to
> >> > > >>> simply pass the fetch time by themselves if they have decided to
> >> not
> >> > > use
> >> > > >>> SourceReaderBase. Also, it looks like we do not have an API
> >> pattern
> >> > > that
> >> > > >>> lets users get the value of a metric and derive another metric.
> >> So I
> >> > > >>> think
> >> > > >>> it is easier for people to understand if LastFetchTimeGauge() is
> >> just
> >> > > an
> >> > > >>> independent metric by itself, instead of being a part of the
> >> > > >>> eventTimeFetchLag computation.
> >> > > >>>
> >> > > >> I'm not sure if I follow the global variable argument, could you
> >> > > >> elaborate? Are you referring specifically to the SettableGauge?
> >> How is
> >> > > that
> >> > > >> different from a Counter or Meter?
> >> > > >>
> >> > > >> With the current design, we could very well add a LastFetchTime
> >> > metric.
> >> > > >> The key point of the current abstraction is that a user gets the
> >> much
> >> > > >> harder eventTimeFetchLag metric for free, since we already need
> to
> >> > > extract
> >> > > >> the event time for other metrics. I think the JavaDoc makes it
> >> clear
> >> > > what
> >> > > >> the intent of the LastFetchTimeGauge is and if not we can improve
> >> it.
> >> > > >> Btw we have derived metrics already. For example, we have Meters
> >> for
> >> > > >> byteIn/Out and recordIn/Out. That's already part of FLIP-33.
> >> > > >>
> >> > > >> Would it make sense to have a more generic metadata type <T>
> >> > associated
> >> > > >>> with the records batch? In some cases, it may be useful to allow
> >> the
> >> > > >>> Source
> >> > > >>> implementation to carry some additional information of the batch
> >> to
> >> > the
> >> > > >>> RecordEmitter. For example, the split info of the batch, the
> >> sender
> >> > of
> >> > > >>> the
> >> > > >>> batch etc. Because the RecordEmitter only takes one record at.a
> >> time,
> >> > > >>> currently such information needs to be put into each record,
> which
> >> > may
> >> > > >>> involve a lot of wrapper object creation.
> >> > > >>>
> >> > > >> I like the idea of having more general metadata and I follow the
> >> > > example.
> >> > > >> I'm wondering if we could avoid a generic type (since that adds a
> >> bit
> >> > of
> >> > > >> complexity to the mental model and usage) by simply encouraging
> to
> >> > use a
> >> > > >> more specific MetaData subclass as a return type of the method.
> >> > > >>
> >> > > >> public interface RecordsWithSplitIds<E> {
> >> > > >>     @Nullable    default RecordMetadata getMetadata() {
> >> > > >>         return null;    }
> >> > > >>     ...
> >> > > >> }
> >> > > >>
> >> > > >> public interface RecordMetadata {
> >> > > >>     long getLastFetchTime(); // mandatory?}
> >> > > >>
> >> > > >> And using it as
> >> > > >>
> >> > > >> public class KafkaRecordMetadata implements RecordMetadata {}
> >> > > >>
> >> > > >> private static class KafkaPartitionSplitRecords<T> implements
> >> > > RecordsWithSplitIds<T> {
> >> > > >>     @Override    public KafkaRecordMetadata getMetadata() {
> >> > > >>         return metadata;    }
> >> > > >> }
> >> > > >>
> >> > > >> Or do we want to have the generic to explicitly pass it to the
> >> > > >> RecordEmitter? Would that metadata be a fourth parameter of
> >> > > >> RecordEmitter#emitRecord?
> >> > > >>
> >> > > >> It might be slightly better if we let the method accept a
> Supplier
> >> in
> >> > > this
> >> > > >>> case. However, it seems to introduce a parallel channel or a
> >> sidepath
> >> > > >>> between the user implementation and SourceOutput. I am not sure
> if
> >> > this
> >> > > >>> is
> >> > > >>> the right way to go. Would it be more intuitive if we just add a
> >> new
> >> > > >>> method
> >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
> >> > explicitly?
> >> > > >>> This would work well with the change I suggested above, which
> >> adds a
> >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
> >> that to
> >> > > the
> >> > > >>> RecordEmitter.emitRecord() as an argument.
> >> > > >>>
> >> > > >>
> >> > > >> We could do that. That would remove the gauge from the
> MetricGroup,
> >> > > >> right? The main downside is that sources that do not use
> >> > > SourceReaderBase
> >> > > >> cannot set the metric anymore. So I'd rather keep the current way
> >> and
> >> > > >> extend it with the metadata extension.
> >> > > >>
> >> > > >> Best,
> >> > > >>
> >> > > >> Arvid
> >> > > >>
> >> > > >>
> >> > > >> On Wed, Jul 21, 2021 at 1:38 PM Becket Qin <becket....@gmail.com
> >
> >> > > wrote:
> >> > > >>
> >> > > >>> Hey Chesnay,
> >> > > >>>
> >> > > >>> I think I got what that method was designed for now. Basically
> the
> >> > > >>> motivation is to let the SourceOutput to report the
> >> eventTimeFetchLag
> >> > > for
> >> > > >>> users. At this point, the SourceOutput only has the EventTime,
> so
> >> > this
> >> > > >>> method provides a way for the users to pass the FetchTime to the
> >> > > >>> SourceOutput. This is essentially a context associated with each
> >> > record
> >> > > >>> emitted to the SourceOutput.
> >> > > >>>
> >> > > >>> It might be slightly better if we let the method accept a
> >> Supplier in
> >> > > >>> this
> >> > > >>> case. However, it seems to introduce a parallel channel or a
> >> sidepath
> >> > > >>> between the user implementation and SourceOutput. I am not sure
> if
> >> > this
> >> > > >>> is
> >> > > >>> the right way to go. Would it be more intuitive if we just add a
> >> new
> >> > > >>> method
> >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in
> >> > explicitly?
> >> > > >>> This would work well with the change I suggested above, which
> >> adds a
> >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes
> >> that to
> >> > > the
> >> > > >>> RecordEmitter.emitRecord() as an argument.
> >> > > >>>
> >> > > >>> What do you think?
> >> > > >>>
> >> > > >>> Thanks,
> >> > > >>>
> >> > > >>> Jiangjie (Becket) Qin
> >> > > >>>
> >> > > >>> On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler <
> >> ches...@apache.org
> >> > >
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>> > Would it be easier to understand if the method would accept a
> >> > > Supplier
> >> > > >>> > instead?
> >> > > >>> >
> >> > > >>> > On 20/07/2021 05:36, Becket Qin wrote:
> >> > > >>> > > In that case, do we still need the metric here? It seems we
> >> are
> >> > > >>> creating
> >> > > >>> > a
> >> > > >>> > > "global variable" which users may potentially use. I am
> >> wondering
> >> > > how
> >> > > >>> > much
> >> > > >>> > > additional convenience it provides because it seems easy for
> >> > people
> >> > > >>> to
> >> > > >>> > > simply pass the fetch time by themselves if they have
> decided
> >> to
> >> > > not
> >> > > >>> use
> >> > > >>> > > SourceReaderBase. Also, it looks like we do not have an API
> >> > pattern
> >> > > >>> that
> >> > > >>> > > lets users get the value of a metric and derive another
> >> metric.
> >> > So
> >> > > I
> >> > > >>> > think
> >> > > >>> > > it is easier for people to understand if
> LastFetchTimeGauge()
> >> is
> >> > > >>> just an
> >> > > >>> > > independent metric by itself, instead of being a part of the
> >> > > >>> > > eventTimeFetchLag computation.
> >> > > >>> >
> >> > > >>> >
> >> > > >>> >
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to