Hi Sagar and Yash, > the way it's defined in https://kafka.apache.org/documentation/#connect_monitoring for the metrics
4.1. Got it. Add it to the KIP. > The only thing I would argue is do we need sink-record-latency-min? Maybe we > could remove this min metric as well and make all of the 3 e2e metrics > consistent 4.2 I see. Will remove it from the KIP. > Probably users can track the metrics at their end to > figure that out. Do you think that makes sense? 4.3. Yes, agree. With these new metrics it should be easier for users to track this. > I think it makes sense to not have a min metric for either to remain > consistent with the existing put-batch and poll-batch metrics 5.1. Got it. Same as 4.2 > Another naming related suggestion I had was with the > "convert-time" metrics - we should probably include transformations in the > name since SMTs could definitely be attributable to a sizable chunk of the > latency depending on the specific transformation chain. 5.2. Make sense. I'm proposing to add `sink-record-convert-transform...` and `source-record-transform-convert...` to represent correctly the order of operations. > it seems like both source and sink tasks only record metrics at a "batch" > level, not on an individual record level. I think it might be additional > overhead if we want to record these new metrics all at the record level? 5.3. I considered at the beginning to implement all metrics at the batch level, but given how the framework process records, I fallback to the proposed approach: - Sink Task: - `WorkerSinkTask#convertMessages(msgs)` already iterates over records, so there is no additional overhead to capture record latency per record. - https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514 - `WorkerSinkTask#convertAndTransformRecord(record)` actually happens individually. Measuring this operation per batch would include processing that is not strictly part of "convert and transform" - https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518 - Source Task: - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and applies transforms and convert record individually as well: - https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390 > This might require some additional changes - > for instance, with the "sink-record-latency" metric, we might only want to > have a "max" metric since "avg" would require recording a value on the > sensor for each record (whereas we can get a "max" by only recording a > metric value for the oldest record in each batch). 5.4. Recording record-latency per batch may not be as useful as there is no guarantee that the oldest record will be representative of the batch. On Sat, 3 Sept 2022 at 16:02, Yash Mayya <yash.ma...@gmail.com> wrote: > Hi Jorge and Sagar, > > I think it makes sense to not have a min metric for either to remain > consistent with the existing put-batch and poll-batch metrics (it doesn't > seem particularly useful either anyway). Also, the new > "sink-record-latency" metric name looks fine to me, thanks for making the > changes! Another naming related suggestion I had was with the > "convert-time" metrics - we should probably include transformations in the > name since SMTs could definitely be attributable to a sizable chunk of the > latency depending on the specific transformation chain. > > I have one high level question with respect to implementation - currently, > it seems like both source and sink tasks only record metrics at a "batch" > level, not on an individual record level. I think it might be additional > overhead if we want to record these new metrics all at the record level? > Could we instead make all of these new metrics for batches of records > rather than individual records in order to remain consistent with the > existing task level metrics? This might require some additional changes - > for instance, with the "sink-record-latency" metric, we might only want to > have a "max" metric since "avg" would require recording a value on the > sensor for each record (whereas we can get a "max" by only recording a > metric value for the oldest record in each batch). > > Thanks, > Yash > > On Fri, Sep 2, 2022 at 3:16 PM Sagar <sagarmeansoc...@gmail.com> wrote: > > > Hi Jorge, > > > > Thanks for the changes. > > > > Regarding the metrics, I meant something like this: > > > kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}" > > > > the way it's defined in > > https://kafka.apache.org/documentation/#connect_monitoring for the > > metrics. > > > > I see what you mean by the 3 metrics and how it can be interpreted. The > > only thing I would argue is do we need sink-record-latency-min? Maybe we > > could remove this min metric as well and make all of the 3 e2e metrics > > consistent(since put-batch also doesn't expose a min which makes sense to > > me). I think this is in contrast to what Yash pointed out above so I > would > > like to hear his thoughts as well. > > > > The other point Yash mentioned about the slightly flawed definition of > e2e > > is also true in a sense. But I have a feeling that's one the records are > > polled by the connector tasks, it would be difficult to track the final > leg > > via the framework. Probably users can track the metrics at their end to > > figure that out. Do you think that makes sense? > > > > Thanks! > > Sagar. > > > > > > > > > > On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > > > Hi Sagar and Yash, > > > > > > Thanks for your feedback! > > > > > > > 1) I am assuming the new metrics would be task level metric. > > > > > > 1.1 Yes, it will be a task level metric, implemented on the > > > Worker[Source/Sink]Task. > > > > > > > Could you specify the way it's done for other sink/source connector? > > > > > > 1.2. Not sure what do you mean by this. Could you elaborate a bit more? > > > > > > > 2. I am slightly confused about the e2e latency metric... > > > > > > 2.1. Yes, I see. I was trying to bring a similar concept as in Streams > > with > > > KIP-613, though the e2e concept may not be translatable. > > > We could keep it as `sink-record-latency` to avoid conflating > concepts. A > > > similar metric naming was proposed in KIP-489 but at the consumer > level — > > > though it seems dormant for a couple of years. > > > > > > > However, the put-batch time measures the > > > > time to put a batch of records to external sink. So, I would assume > > the 2 > > > > can't be added as is to compute the e2e latency. Maybe I am missing > > > > something here. Could you plz clarify this. > > > > > > 2.2. Yes, agree. Not necessarily added, but with the 3 latencies (poll, > > > convert, putBatch) will be clearer where the bottleneck may be, and > > > represent the internal processing. > > > > > > > however, as per the KIP it looks like it will be > > > > the latency between when the record was written to Kafka and when the > > > > record is returned by a sink task's consumer's poll? > > > > > > 3.1. Agree. 2.1. could help to clarify this. > > > > > > > One more thing - I was wondering > > > > if there's a particular reason for having a min metric for e2e > latency > > > but > > > > not for convert time? > > > > > > 3.2. Was following KIP-613 for e2e which seems useful to compare with > > Max a > > > get an idea of the window of results, though current latencies in > > Connector > > > do not include Min, and that's why I haven't added it for convert > > latency. > > > Do you think it make sense to extend latency metrics with Min? > > > > > > KIP is updated to clarify some of these changes. > > > > > > Many thanks, > > > Jorge. > > > > > > On Thu, 1 Sept 2022 at 18:11, Yash Mayya <yash.ma...@gmail.com> wrote: > > > > > > > Hi Jorge, > > > > > > > > Thanks for the KIP! I have the same confusion with the e2e-latency > > > metrics > > > > as Sagar above. "e2e" would seem to indicate the latency between when > > the > > > > record was written to Kafka and when the record was written to the > sink > > > > system by the connector - however, as per the KIP it looks like it > will > > > be > > > > the latency between when the record was written to Kafka and when the > > > > record is returned by a sink task's consumer's poll? I think that > > metric > > > > will be a little confusing to interpret. One more thing - I was > > wondering > > > > if there's a particular reason for having a min metric for e2e > latency > > > but > > > > not for convert time? > > > > > > > > Thanks, > > > > Yash > > > > > > > > On Thu, Sep 1, 2022 at 8:59 PM Sagar <sagarmeansoc...@gmail.com> > > wrote: > > > > > > > > > Hi Jorge, > > > > > > > > > > Thanks for the KIP. It looks like a very good addition. I skimmed > > > through > > > > > once and had a couple of questions => > > > > > > > > > > 1) I am assuming the new metrics would be task level metric. Could > > you > > > > > specify the way it's done for other sink/source connector? > > > > > 2) I am slightly confused about the e2e latency metric. Let's > > consider > > > > the > > > > > sink connector metric. If I look at the way it's supposed to be > > > > calculated, > > > > > i.e the difference between the record timestamp and the wall clock > > > time, > > > > it > > > > > looks like a per record metric. However, the put-batch time > measures > > > the > > > > > time to put a batch of records to external sink. So, I would assume > > > the 2 > > > > > can't be added as is to compute the e2e latency. Maybe I am missing > > > > > something here. Could you plz clarify this. > > > > > > > > > > Thanks! > > > > > Sagar. > > > > > > > > > > On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate Otoya < > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > I'd like to start a discussion thread on KIP-864: Add End-To-End > > > > Latency > > > > > > Metrics to Connectors. > > > > > > This KIP aims to improve the metrics available on Source and Sink > > > > > > Connectors to measure end-to-end latency, including source and > sink > > > > > record > > > > > > conversion time, and sink record e2e latency (similar to KIP-613 > > for > > > > > > Streams). > > > > > > > > > > > > The KIP is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors > > > > > > > > > > > > Please take a look and let me know what you think. > > > > > > > > > > > > Cheers, > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > >