Hi Jorge, Thanks for the changes. With regard to having per batch vs per record metrics, the additional overhead I was referring to wasn't about whether or not we would need to iterate over all the records in a batch. I was referring to the potential additional overhead caused by the higher volume of calls to Sensor::record on the sensors for the new metrics (as compared to the existing batch only metrics), especially for high throughput connectors where batch sizes could be large. I guess we may want to do some sort of performance testing and get concrete numbers to verify whether this is a valid concern or not?
Thanks, Yash On Tue, Sep 6, 2022 at 4:42 PM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Hi Sagar and Yash, > > > the way it's defined in > https://kafka.apache.org/documentation/#connect_monitoring for the metrics > > 4.1. Got it. Add it to the KIP. > > > The only thing I would argue is do we need sink-record-latency-min? Maybe > we > > could remove this min metric as well and make all of the 3 e2e metrics > > consistent > > 4.2 I see. Will remove it from the KIP. > > > Probably users can track the metrics at their end to > > figure that out. Do you think that makes sense? > > 4.3. Yes, agree. With these new metrics it should be easier for users to > track this. > > > I think it makes sense to not have a min metric for either to remain > > consistent with the existing put-batch and poll-batch metrics > > 5.1. Got it. Same as 4.2 > > > Another naming related suggestion I had was with the > > "convert-time" metrics - we should probably include transformations in > the > > name since SMTs could definitely be attributable to a sizable chunk of > the > > latency depending on the specific transformation chain. > > 5.2. Make sense. I'm proposing to add `sink-record-convert-transform...` > and `source-record-transform-convert...` to represent correctly the order > of operations. > > > it seems like both source and sink tasks only record metrics at a "batch" > > level, not on an individual record level. I think it might be additional > > overhead if we want to record these new metrics all at the record level? > > 5.3. I considered at the beginning to implement all metrics at the batch > level, but given how the framework process records, I fallback to the > proposed approach: > - Sink Task: > - `WorkerSinkTask#convertMessages(msgs)` already iterates over records, > so there is no additional overhead to capture record latency per record. > - > > https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514 > - `WorkerSinkTask#convertAndTransformRecord(record)` actually happens > individually. Measuring this operation per batch would include processing > that is not strictly part of "convert and transform" > - > > https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518 > - Source Task: > - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and > applies transforms and convert record individually as well: > - > > https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390 > > > This might require some additional changes - > > for instance, with the "sink-record-latency" metric, we might only want > to > > have a "max" metric since "avg" would require recording a value on the > > sensor for each record (whereas we can get a "max" by only recording a > > metric value for the oldest record in each batch). > > 5.4. Recording record-latency per batch may not be as useful as there is no > guarantee that the oldest record will be representative of the batch. > > On Sat, 3 Sept 2022 at 16:02, Yash Mayya <yash.ma...@gmail.com> wrote: > > > Hi Jorge and Sagar, > > > > I think it makes sense to not have a min metric for either to remain > > consistent with the existing put-batch and poll-batch metrics (it doesn't > > seem particularly useful either anyway). Also, the new > > "sink-record-latency" metric name looks fine to me, thanks for making the > > changes! Another naming related suggestion I had was with the > > "convert-time" metrics - we should probably include transformations in > the > > name since SMTs could definitely be attributable to a sizable chunk of > the > > latency depending on the specific transformation chain. > > > > I have one high level question with respect to implementation - > currently, > > it seems like both source and sink tasks only record metrics at a "batch" > > level, not on an individual record level. I think it might be additional > > overhead if we want to record these new metrics all at the record level? > > Could we instead make all of these new metrics for batches of records > > rather than individual records in order to remain consistent with the > > existing task level metrics? This might require some additional changes - > > for instance, with the "sink-record-latency" metric, we might only want > to > > have a "max" metric since "avg" would require recording a value on the > > sensor for each record (whereas we can get a "max" by only recording a > > metric value for the oldest record in each batch). > > > > Thanks, > > Yash > > > > On Fri, Sep 2, 2022 at 3:16 PM Sagar <sagarmeansoc...@gmail.com> wrote: > > > > > Hi Jorge, > > > > > > Thanks for the changes. > > > > > > Regarding the metrics, I meant something like this: > > > > > > kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}" > > > > > > the way it's defined in > > > https://kafka.apache.org/documentation/#connect_monitoring for the > > > metrics. > > > > > > I see what you mean by the 3 metrics and how it can be interpreted. The > > > only thing I would argue is do we need sink-record-latency-min? Maybe > we > > > could remove this min metric as well and make all of the 3 e2e metrics > > > consistent(since put-batch also doesn't expose a min which makes sense > to > > > me). I think this is in contrast to what Yash pointed out above so I > > would > > > like to hear his thoughts as well. > > > > > > The other point Yash mentioned about the slightly flawed definition of > > e2e > > > is also true in a sense. But I have a feeling that's one the records > are > > > polled by the connector tasks, it would be difficult to track the final > > leg > > > via the framework. Probably users can track the metrics at their end to > > > figure that out. Do you think that makes sense? > > > > > > Thanks! > > > Sagar. > > > > > > > > > > > > > > > On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya < > > > quilcate.jo...@gmail.com> wrote: > > > > > > > Hi Sagar and Yash, > > > > > > > > Thanks for your feedback! > > > > > > > > > 1) I am assuming the new metrics would be task level metric. > > > > > > > > 1.1 Yes, it will be a task level metric, implemented on the > > > > Worker[Source/Sink]Task. > > > > > > > > > Could you specify the way it's done for other sink/source > connector? > > > > > > > > 1.2. Not sure what do you mean by this. Could you elaborate a bit > more? > > > > > > > > > 2. I am slightly confused about the e2e latency metric... > > > > > > > > 2.1. Yes, I see. I was trying to bring a similar concept as in > Streams > > > with > > > > KIP-613, though the e2e concept may not be translatable. > > > > We could keep it as `sink-record-latency` to avoid conflating > > concepts. A > > > > similar metric naming was proposed in KIP-489 but at the consumer > > level — > > > > though it seems dormant for a couple of years. > > > > > > > > > However, the put-batch time measures the > > > > > time to put a batch of records to external sink. So, I would assume > > > the 2 > > > > > can't be added as is to compute the e2e latency. Maybe I am missing > > > > > something here. Could you plz clarify this. > > > > > > > > 2.2. Yes, agree. Not necessarily added, but with the 3 latencies > (poll, > > > > convert, putBatch) will be clearer where the bottleneck may be, and > > > > represent the internal processing. > > > > > > > > > however, as per the KIP it looks like it will be > > > > > the latency between when the record was written to Kafka and when > the > > > > > record is returned by a sink task's consumer's poll? > > > > > > > > 3.1. Agree. 2.1. could help to clarify this. > > > > > > > > > One more thing - I was wondering > > > > > if there's a particular reason for having a min metric for e2e > > latency > > > > but > > > > > not for convert time? > > > > > > > > 3.2. Was following KIP-613 for e2e which seems useful to compare with > > > Max a > > > > get an idea of the window of results, though current latencies in > > > Connector > > > > do not include Min, and that's why I haven't added it for convert > > > latency. > > > > Do you think it make sense to extend latency metrics with Min? > > > > > > > > KIP is updated to clarify some of these changes. > > > > > > > > Many thanks, > > > > Jorge. > > > > > > > > On Thu, 1 Sept 2022 at 18:11, Yash Mayya <yash.ma...@gmail.com> > wrote: > > > > > > > > > Hi Jorge, > > > > > > > > > > Thanks for the KIP! I have the same confusion with the e2e-latency > > > > metrics > > > > > as Sagar above. "e2e" would seem to indicate the latency between > when > > > the > > > > > record was written to Kafka and when the record was written to the > > sink > > > > > system by the connector - however, as per the KIP it looks like it > > will > > > > be > > > > > the latency between when the record was written to Kafka and when > the > > > > > record is returned by a sink task's consumer's poll? I think that > > > metric > > > > > will be a little confusing to interpret. One more thing - I was > > > wondering > > > > > if there's a particular reason for having a min metric for e2e > > latency > > > > but > > > > > not for convert time? > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Thu, Sep 1, 2022 at 8:59 PM Sagar <sagarmeansoc...@gmail.com> > > > wrote: > > > > > > > > > > > Hi Jorge, > > > > > > > > > > > > Thanks for the KIP. It looks like a very good addition. I skimmed > > > > through > > > > > > once and had a couple of questions => > > > > > > > > > > > > 1) I am assuming the new metrics would be task level metric. > Could > > > you > > > > > > specify the way it's done for other sink/source connector? > > > > > > 2) I am slightly confused about the e2e latency metric. Let's > > > consider > > > > > the > > > > > > sink connector metric. If I look at the way it's supposed to be > > > > > calculated, > > > > > > i.e the difference between the record timestamp and the wall > clock > > > > time, > > > > > it > > > > > > looks like a per record metric. However, the put-batch time > > measures > > > > the > > > > > > time to put a batch of records to external sink. So, I would > assume > > > > the 2 > > > > > > can't be added as is to compute the e2e latency. Maybe I am > missing > > > > > > something here. Could you plz clarify this. > > > > > > > > > > > > Thanks! > > > > > > Sagar. > > > > > > > > > > > > On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate Otoya < > > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I'd like to start a discussion thread on KIP-864: Add > End-To-End > > > > > Latency > > > > > > > Metrics to Connectors. > > > > > > > This KIP aims to improve the metrics available on Source and > Sink > > > > > > > Connectors to measure end-to-end latency, including source and > > sink > > > > > > record > > > > > > > conversion time, and sink record e2e latency (similar to > KIP-613 > > > for > > > > > > > Streams). > > > > > > > > > > > > > > The KIP is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors > > > > > > > > > > > > > > Please take a look and let me know what you think. > > > > > > > > > > > > > > Cheers, > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > >