Hi everyone, I started the voting thread [1]. Please cast your vote there or ask additional questions here.
Best, Arvid [1] https://lists.apache.org/thread.html/r70d321b6aa62ab4e31c8b73552b2de7846c4d31ed6f08d6541a9b36e%40%3Cdev.flink.apache.org%3E On Fri, Jul 30, 2021 at 10:46 AM Becket Qin <becket....@gmail.com> wrote: > Hi Arvid, > > I think it is OK to leave eventTimeFetchLag out of the scope of this FLIP > given that it may involve additional API changes. > > 5. RecordMetadata is currently not simplifying any code. By the current > > design RecordMetadata is a read-only data structure that is constant for > > all records in a batch. So in Kafka, we still need to pass Tuple3 because > > offset and timestamp are per record. > > Does this depend on whether we will get the RecordMetadata per record or > per batch? We can make the semantic of RecordsWithSplitIds.metadata() to be > the metadata associated with the last record returned by > RecordsWithSplitIds.nextRecordsFromSplit(). In this case, individual > implementations can decide whether to return different metadata for each > record or not. In case of Kafka, the Tuple3 can be replaced with three > lists of records, timestamps and offsets respectively. It probably saves > some object instantiation, assuming the RecordMetadata object itself can be > reused. > > 6. We might rename and change the semantics into > > public interface RecordsWithSplitIds<E> { > > /** > > * Returns the record metadata. The metadata is shared for all > > records in the current split. > > */ > > @Nullable > > default RecordMetadata metadataOfCurrentSplit() { > > return null; > > } > > ... > > } > > Maybe we can move one step further to make it "metadataOfCurrentRecord()" > as I mentioned above. > > Thanks, > > Jiangjie (Becket) QIn > > On Fri, Jul 30, 2021 at 3:00 PM Arvid Heise <ar...@apache.org> wrote: > > > Hi folks, > > > > To move on with the FLIP, I will cut out eventTimeFetchLag out of scope > and > > go ahead with the remainder. > > > > I will open a VOTE later to today. > > > > Best, > > > > Arvid > > > > On Wed, Jul 28, 2021 at 8:44 AM Arvid Heise <ar...@apache.org> wrote: > > > > > Hi Becket, > > > > > > I have updated the PR according to your suggestion (note that this > commit > > > contains the removal of the previous approach) [1]. Here are my > > > observations: > > > 1. Adding the type of RecordMetadata to emitRecord would require adding > > > another type parameter to RecordEmitter and SourceReaderBase. So I left > > > that out for now as it would break things completely. > > > 2. RecordEmitter implementations that want to pass it to SourceOutput > > need > > > to be changed in a boilerplate fashion. (passing the metadata to the > > > SourceOutput) > > > 3. RecordMetadata as an interface (as in the commit) probably requires > > > boilerplate implementations in using sources as well. > > > 4. SourceOutput would also require an additional collect > > > > > > default void collect(T record, RecordMetadata metadata) { > > > collect(record, TimestampAssigner.NO_TIMESTAMP, metadata); > > > } > > > > > > 5. RecordMetadata is currently not simplifying any code. By the current > > > design RecordMetadata is a read-only data structure that is constant > for > > > all records in a batch. So in Kafka, we still need to pass Tuple3 > because > > > offset and timestamp are per record. > > > 6. RecordMetadata is currently the same for all splits in > > > RecordsWithSplitIds. > > > > > > Some ideas for the above points: > > > 3. We should accompy it with a default implementation to avoid the > > trivial > > > POJO implementations as the KafkaRecordMetadata of my commit. Can we > skip > > > the interface and just have RecordMetadata as a base class? > > > 1.,2.,4. We could also set the metadata only once in an orthogonal > method > > > that need to be called before collect like > > SourceOutput#setRecordMetadata. > > > Then we can implement it entirely in SourceReaderBase without changing > > any > > > code. The clear downside is that it introduces some implicit state in > > > SourceOutput (which we implement) and is harder to use in > > > non-SourceReaderBase classes: Source devs need to remember to call > > > setRecordMetadata before collect for a respective record. > > > 6. We might rename and change the semantics into > > > > > > public interface RecordsWithSplitIds<E> { > > > /** > > > * Returns the record metadata. The metadata is shared for all > > records in the current split. > > > */ > > > @Nullable > > > default RecordMetadata metadataOfCurrentSplit() { > > > return null; > > > } > > > ... > > > } > > > > > > > > > Re global variable > > > > > >> To explain a bit more on the metric being a global variable, I think > in > > >> general there are two ways to pass a value from one code block to > > another. > > >> The first way is direct passing. That means the variable is explicitly > > >> passed from one code block to another via arguments, be them in the > > >> constructor or methods. Another way is indirect passing through > context, > > >> that means the information is stored in some kind of context or > > >> environment, and everyone can have access to it. And there is no > > explicit > > >> value passing from one code block to another because everyone just > reads > > >> from/writes to the context or environment. This is basically the > "global > > >> variable" pattern I am talking about. > > >> > > >> In general people would avoid having a mutable global value shared > > across > > >> code blocks, because it is usually less deterministic and therefore > more > > >> difficult to understand or debug. > > >> > > > Since the first approach was using a Gauge, it's a callback and not a > > > global value. The actual value is passed when invoking the callback. > It's > > > the same as a supplier. However, the gauge itself is stored in the > > context, > > > so your argument holds on that level. > > > > > > > > >> Moreover, generally speaking, the Metrics in systems are usually > > perceived > > >> as a reporting mechanism. People usually think of it as a way to > expose > > >> some internal values to the external system, and don't expect the > > program > > >> itself to read the reported values again in the main logic, which is > > >> essentially using the MetricGroup as a context to pass values across > > code > > >> block, i.e. the "global variable" pattern. Instead, people would > usually > > >> use the "direct passing" to do this. > > >> > > > Here I still don't see a difference on how we calculate the meter > values > > > from the byteIn/Out counters. We also need to read the counters > > > periodically and calculate a secondary metric. So it can't be that > > > unexpected to users. > > > > > > [1] > > > > > > https://github.com/apache/flink/commit/71212e6baf2906444987253d0cf13b5a5978a43b > > > > > > On Tue, Jul 27, 2021 at 3:19 AM Becket Qin <becket....@gmail.com> > wrote: > > > > > >> Hi Arvid, > > >> > > >> Thanks for the patient discussion. > > >> > > >> To explain a bit more on the metric being a global variable, I think > in > > >> general there are two ways to pass a value from one code block to > > another. > > >> The first way is direct passing. That means the variable is explicitly > > >> passed from one code block to another via arguments, be them in the > > >> constructor or methods. Another way is indirect passing through > context, > > >> that means the information is stored in some kind of context or > > >> environment, and everyone can have access to it. And there is no > > explicit > > >> value passing from one code block to another because everyone just > reads > > >> from/writes to the context or environment. This is basically the > "global > > >> variable" pattern I am talking about. > > >> > > >> In general people would avoid having a mutable global value shared > > across > > >> code blocks, because it is usually less deterministic and therefore > more > > >> difficult to understand or debug. > > >> > > >> Moreover, generally speaking, the Metrics in systems are usually > > perceived > > >> as a reporting mechanism. People usually think of it as a way to > expose > > >> some internal values to the external system, and don't expect the > > program > > >> itself to read the reported values again in the main logic, which is > > >> essentially using the MetricGroup as a context to pass values across > > code > > >> block, i.e. the "global variable" pattern. Instead, people would > usually > > >> use the "direct passing" to do this. > > >> > > >> >Can we think of other use cases for the fetchTime parameter beyond > > >> metrics > > >> in the future? If so, it would make an even stronger case. > > >> At this point, I cannot think of other use cases for fetchTime, but I > > can > > >> see use cases where people want to get a per split fetch lag. So I am > > >> wondering if it makes sense to generalize the API a little bit by > > >> introducing collect(T Record, Long timestamp, Metadata metadata). This > > >> also > > >> makes a natural alignment because the RecordsWithSplitIds also > returns a > > >> Metadata associated with the record, which can be used by > RecordEmitter > > as > > >> well as the SourceOutput. > > >> > > >> What do you think? > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> > > >> On Fri, Jul 23, 2021 at 7:58 PM Arvid Heise <ar...@apache.org> wrote: > > >> > > >> > Hi Becket, > > >> > > > >> > I still can't follow your view on the metric being a global variable > > or > > >> > your concern that it is confusing to users. Nevertheless, I like > your > > >> > proposal with having an additional collect method. > > >> > > > >> > I was thinking that > > >> > > SourceOutput is going to have an additional method of collect(T > > >> Record, > > >> > > Long timestamp, Long fetchTime). So people can just pass in the > > fetch > > >> > time > > >> > > directly when they emit a record, regardless of using > > >> SourceReaderBase or > > >> > > not. > > >> > > > > >> > > > >> > Can we think of other use cases for the fetchTime parameter beyond > > >> metrics > > >> > in the future? If so, it would make an even stronger case. > > >> > > > >> > I'll update the PR with your proposals. > > >> > > > >> > Best, > > >> > > > >> > Arvid > > >> > > > >> > On Fri, Jul 23, 2021 at 12:08 PM Becket Qin <becket....@gmail.com> > > >> wrote: > > >> > > > >> > > Regarding the generic type v.s. class/subclasses of Metadata. > > >> > > > > >> > > I think generic types usually make sense if the framework/abstract > > >> class > > >> > > itself does not look into the instances, but just pass them from > one > > >> user > > >> > > logic to another. Otherwise, interfaces or class/subclasses would > be > > >> > > preferred. > > >> > > > > >> > > In our case, it depends on whether we expect the SourceReaderBase > to > > >> look > > >> > > into the MetaData. At this point, it does not. But it seems > possible > > >> that > > >> > > in the future it may look into MetaData. Therefore I think the > > class / > > >> > > subclass pattern would be better. > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Jiangjie (Becket) Qin > > >> > > > > >> > > > > >> > > On Fri, Jul 23, 2021 at 5:54 PM Becket Qin <becket....@gmail.com> > > >> wrote: > > >> > > > > >> > > > Hi Arvid, > > >> > > > > > >> > > > > I'm not sure if I follow the global variable argument, could > you > > >> > > > elaborate? Are you referring specifically to the SettableGauge? > > How > > >> is > > >> > > that > > >> > > > different from a Counter or Meter? > > >> > > > What I meant is that the fetch lag computing logic can either > get > > >> the > > >> > > > information required from method argument or something like a > > static > > >> > > global > > >> > > > variable. We are essentially trying to reuse the metric as a > > static > > >> > > global > > >> > > > variable. It seems not a common pattern in most systems. It is a > > >> little > > >> > > > counterintuitive that a gauge reported to the metric system > would > > be > > >> > used > > >> > > > by the program main logic later on as a variable. > > >> > > > > > >> > > > > We could do that. That would remove the gauge from the > > >> MetricGroup, > > >> > > > right? The main downside is that sources that do not use > > >> > SourceReaderBase > > >> > > > cannot set the metric anymore. So I'd rather keep the current > way > > >> and > > >> > > > extend it with the metadata extension. > > >> > > > Yes, that would remove the gauge from the MetricGroup. I was > > >> thinking > > >> > > that > > >> > > > SourceOutput is going to have an additional method of collect(T > > >> Record, > > >> > > > Long timestamp, Long fetchTime). So people can just pass in the > > >> fetch > > >> > > time > > >> > > > directly when they emit a record, regardless of using > > >> SourceReaderBase > > >> > or > > >> > > > not. > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Jiangjie (Becket) Qin > > >> > > > > > >> > > > On Thu, Jul 22, 2021 at 3:46 PM Chesnay Schepler < > > >> ches...@apache.org> > > >> > > > wrote: > > >> > > > > > >> > > >> The only histogram implementation available to use are those by > > >> > > >> dropwizard, and they do some lock-free synchronization stuff > that > > >> so > > >> > > far we > > >> > > >> wanted to keep out of hot paths (this applis to both reading > and > > >> > > writing); > > >> > > >> we have however never made benchmarks. > > >> > > >> But it is reasonable to assume that they are way more expensive > > >> than > > >> > the > > >> > > >> alternatives (in the ideal case just being a getter). > > >> > > >> You'd pay that cost irrespective of whether a reporter is > enabled > > >> or > > >> > > not, > > >> > > >> which is another thing we so far wanted to prevent. > > >> > > >> Finally, histograms are problematic because they are 10x more > > >> > expensive > > >> > > >> on the metric backend (because they are effectively 10 > metrics), > > >> and > > >> > > should > > >> > > >> be used with extreme caution, in particular because we lack any > > >> switch > > >> > > to > > >> > > >> disable/enable metrics (and I think we are getting to a point > > where > > >> > the > > >> > > >> metric system becomes unusable for heavy users because of that, > > >> where > > >> > > >> another histogram isn't helping). > > >> > > >> > > >> > > >> Overall, at this time I'm against using Histograms. > > >> > > >> Furthermore, I believe that we should be able to derive a > > Histogram > > >> > from > > >> > > >> that supplier if we later one decide differently. We'd just > need > > to > > >> > poll > > >> > > >> the supplier more often. > > >> > > >> > > >> > > >> On 22/07/2021 09:36, Arvid Heise wrote: > > >> > > >> > > >> > > >> Hi all, > > >> > > >> > > >> > > >> @Steven Wu <stevenz...@gmail.com> > > >> > > >> > > >> > > >>> Regarding "lastFetchTime" latency metric, I found Gauge to be > > less > > >> > > >>> informative as it only captures the last sampling value for > each > > >> > metric > > >> > > >>> publish interval (e.g. 60s). > > >> > > >>> * Can we make it a histogram? Histograms are more expensive > > >> though. > > >> > > >>> * Timer [1, 2] is cheaper as it just tracks min, max, avg, > > count. > > >> but > > >> > > >>> there > > >> > > >>> is no such metric type in Flink > > >> > > >>> * Summary metric type [3] (from Prometheus) would be nice too > > >> > > >>> > > >> > > >> I'd also think that a histogram is much more expressive but the > > >> > original > > >> > > >> FLIP-33 decided against it because of it's cost. @Chesnay > > Schepler > > >> > > >> <ches...@apache.org> could you shed some light on how much > more > > >> > > >> expensive it is in comparison to a simple gauge? Does it depend > > on > > >> > > whether > > >> > > >> a reporter is actually using the metric? > > >> > > >> The current interface of this FLIP-179 would actually allow to > > >> switch > > >> > > the > > >> > > >> type of the metric later. But since the metric type is > > >> user-facing, we > > >> > > need > > >> > > >> to have an agreement now. > > >> > > >> > > >> > > >> @Becket Qin <becket....@gmail.com> > > >> > > >> > > >> > > >>> In that case, do we still need the metric here? It seems we > are > > >> > > creating > > >> > > >>> a > > >> > > >>> "global variable" which users may potentially use. I am > > wondering > > >> how > > >> > > >>> much > > >> > > >>> additional convenience it provides because it seems easy for > > >> people > > >> > to > > >> > > >>> simply pass the fetch time by themselves if they have decided > to > > >> not > > >> > > use > > >> > > >>> SourceReaderBase. Also, it looks like we do not have an API > > >> pattern > > >> > > that > > >> > > >>> lets users get the value of a metric and derive another > metric. > > >> So I > > >> > > >>> think > > >> > > >>> it is easier for people to understand if LastFetchTimeGauge() > is > > >> just > > >> > > an > > >> > > >>> independent metric by itself, instead of being a part of the > > >> > > >>> eventTimeFetchLag computation. > > >> > > >>> > > >> > > >> I'm not sure if I follow the global variable argument, could > you > > >> > > >> elaborate? Are you referring specifically to the SettableGauge? > > >> How is > > >> > > that > > >> > > >> different from a Counter or Meter? > > >> > > >> > > >> > > >> With the current design, we could very well add a LastFetchTime > > >> > metric. > > >> > > >> The key point of the current abstraction is that a user gets > the > > >> much > > >> > > >> harder eventTimeFetchLag metric for free, since we already need > > to > > >> > > extract > > >> > > >> the event time for other metrics. I think the JavaDoc makes it > > >> clear > > >> > > what > > >> > > >> the intent of the LastFetchTimeGauge is and if not we can > improve > > >> it. > > >> > > >> Btw we have derived metrics already. For example, we have > Meters > > >> for > > >> > > >> byteIn/Out and recordIn/Out. That's already part of FLIP-33. > > >> > > >> > > >> > > >> Would it make sense to have a more generic metadata type <T> > > >> > associated > > >> > > >>> with the records batch? In some cases, it may be useful to > allow > > >> the > > >> > > >>> Source > > >> > > >>> implementation to carry some additional information of the > batch > > >> to > > >> > the > > >> > > >>> RecordEmitter. For example, the split info of the batch, the > > >> sender > > >> > of > > >> > > >>> the > > >> > > >>> batch etc. Because the RecordEmitter only takes one record > at.a > > >> time, > > >> > > >>> currently such information needs to be put into each record, > > which > > >> > may > > >> > > >>> involve a lot of wrapper object creation. > > >> > > >>> > > >> > > >> I like the idea of having more general metadata and I follow > the > > >> > > example. > > >> > > >> I'm wondering if we could avoid a generic type (since that > adds a > > >> bit > > >> > of > > >> > > >> complexity to the mental model and usage) by simply encouraging > > to > > >> > use a > > >> > > >> more specific MetaData subclass as a return type of the method. > > >> > > >> > > >> > > >> public interface RecordsWithSplitIds<E> { > > >> > > >> @Nullable default RecordMetadata getMetadata() { > > >> > > >> return null; } > > >> > > >> ... > > >> > > >> } > > >> > > >> > > >> > > >> public interface RecordMetadata { > > >> > > >> long getLastFetchTime(); // mandatory?} > > >> > > >> > > >> > > >> And using it as > > >> > > >> > > >> > > >> public class KafkaRecordMetadata implements RecordMetadata {} > > >> > > >> > > >> > > >> private static class KafkaPartitionSplitRecords<T> implements > > >> > > RecordsWithSplitIds<T> { > > >> > > >> @Override public KafkaRecordMetadata getMetadata() { > > >> > > >> return metadata; } > > >> > > >> } > > >> > > >> > > >> > > >> Or do we want to have the generic to explicitly pass it to the > > >> > > >> RecordEmitter? Would that metadata be a fourth parameter of > > >> > > >> RecordEmitter#emitRecord? > > >> > > >> > > >> > > >> It might be slightly better if we let the method accept a > > Supplier > > >> in > > >> > > this > > >> > > >>> case. However, it seems to introduce a parallel channel or a > > >> sidepath > > >> > > >>> between the user implementation and SourceOutput. I am not > sure > > if > > >> > this > > >> > > >>> is > > >> > > >>> the right way to go. Would it be more intuitive if we just > add a > > >> new > > >> > > >>> method > > >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in > > >> > explicitly? > > >> > > >>> This would work well with the change I suggested above, which > > >> adds a > > >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes > > >> that to > > >> > > the > > >> > > >>> RecordEmitter.emitRecord() as an argument. > > >> > > >>> > > >> > > >> > > >> > > >> We could do that. That would remove the gauge from the > > MetricGroup, > > >> > > >> right? The main downside is that sources that do not use > > >> > > SourceReaderBase > > >> > > >> cannot set the metric anymore. So I'd rather keep the current > way > > >> and > > >> > > >> extend it with the metadata extension. > > >> > > >> > > >> > > >> Best, > > >> > > >> > > >> > > >> Arvid > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Jul 21, 2021 at 1:38 PM Becket Qin < > becket....@gmail.com > > > > > >> > > wrote: > > >> > > >> > > >> > > >>> Hey Chesnay, > > >> > > >>> > > >> > > >>> I think I got what that method was designed for now. Basically > > the > > >> > > >>> motivation is to let the SourceOutput to report the > > >> eventTimeFetchLag > > >> > > for > > >> > > >>> users. At this point, the SourceOutput only has the EventTime, > > so > > >> > this > > >> > > >>> method provides a way for the users to pass the FetchTime to > the > > >> > > >>> SourceOutput. This is essentially a context associated with > each > > >> > record > > >> > > >>> emitted to the SourceOutput. > > >> > > >>> > > >> > > >>> It might be slightly better if we let the method accept a > > >> Supplier in > > >> > > >>> this > > >> > > >>> case. However, it seems to introduce a parallel channel or a > > >> sidepath > > >> > > >>> between the user implementation and SourceOutput. I am not > sure > > if > > >> > this > > >> > > >>> is > > >> > > >>> the right way to go. Would it be more intuitive if we just > add a > > >> new > > >> > > >>> method > > >> > > >>> to the SourceOutput, to allow the FetchTime to be passed in > > >> > explicitly? > > >> > > >>> This would work well with the change I suggested above, which > > >> adds a > > >> > > >>> generic metadata type <T> to the RecordsWithSplits and passes > > >> that to > > >> > > the > > >> > > >>> RecordEmitter.emitRecord() as an argument. > > >> > > >>> > > >> > > >>> What do you think? > > >> > > >>> > > >> > > >>> Thanks, > > >> > > >>> > > >> > > >>> Jiangjie (Becket) Qin > > >> > > >>> > > >> > > >>> On Tue, Jul 20, 2021 at 2:50 PM Chesnay Schepler < > > >> ches...@apache.org > > >> > > > > >> > > >>> wrote: > > >> > > >>> > > >> > > >>> > Would it be easier to understand if the method would accept > a > > >> > > Supplier > > >> > > >>> > instead? > > >> > > >>> > > > >> > > >>> > On 20/07/2021 05:36, Becket Qin wrote: > > >> > > >>> > > In that case, do we still need the metric here? It seems > we > > >> are > > >> > > >>> creating > > >> > > >>> > a > > >> > > >>> > > "global variable" which users may potentially use. I am > > >> wondering > > >> > > how > > >> > > >>> > much > > >> > > >>> > > additional convenience it provides because it seems easy > for > > >> > people > > >> > > >>> to > > >> > > >>> > > simply pass the fetch time by themselves if they have > > decided > > >> to > > >> > > not > > >> > > >>> use > > >> > > >>> > > SourceReaderBase. Also, it looks like we do not have an > API > > >> > pattern > > >> > > >>> that > > >> > > >>> > > lets users get the value of a metric and derive another > > >> metric. > > >> > So > > >> > > I > > >> > > >>> > think > > >> > > >>> > > it is easier for people to understand if > > LastFetchTimeGauge() > > >> is > > >> > > >>> just an > > >> > > >>> > > independent metric by itself, instead of being a part of > the > > >> > > >>> > > eventTimeFetchLag computation. > > >> > > >>> > > > >> > > >>> > > > >> > > >>> > > > >> > > >>> > > >> > > >> > > >> > > >> > > >> > > > > >> > > > >> > > > > > >