Hi everyone, I've made a slight addition to the KIP based on Yash feedback:
- A new metric is added at INFO level to record the max latency from the batch timestamp, by keeping the oldest record timestamp per batch. - A draft implementation is linked. Looking forward to your feedback. Also, a kindly reminder that the vote thread is open. Thanks! Jorge. On Thu, 8 Sept 2022 at 14:25, Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Great. I have updated the KIP to reflect this. > > Cheers, > Jorge. > > On Thu, 8 Sept 2022 at 12:26, Yash Mayya <yash.ma...@gmail.com> wrote: > >> Thanks, I think it makes sense to define these metrics at a DEBUG >> recording >> level. >> >> On Thu, Sep 8, 2022 at 2:51 PM Jorge Esteban Quilcate Otoya < >> quilcate.jo...@gmail.com> wrote: >> >> > On Thu, 8 Sept 2022 at 05:55, Yash Mayya <yash.ma...@gmail.com> wrote: >> > >> > > Hi Jorge, >> > > >> > > Thanks for the changes. With regard to having per batch vs per record >> > > metrics, the additional overhead I was referring to wasn't about >> whether >> > or >> > > not we would need to iterate over all the records in a batch. I was >> > > referring to the potential additional overhead caused by the higher >> > volume >> > > of calls to Sensor::record on the sensors for the new metrics (as >> > compared >> > > to the existing batch only metrics), especially for high throughput >> > > connectors where batch sizes could be large. I guess we may want to do >> > some >> > > sort of performance testing and get concrete numbers to verify whether >> > this >> > > is a valid concern or not? >> > > >> > >> > 6.1. Got it, thanks for clarifying. I guess there could be a benchmark >> test >> > of the `Sensor::record` to get an idea of the performance impact. >> > Regardless, the fact that these are single-record metrics compared to >> > existing batch-only could be explicitly defined by setting these >> metrics at >> > a DEBUG or TRACE metric recording level, leaving the existing at INFO >> > level. >> > wdyt? >> > >> > >> > > >> > > Thanks, >> > > Yash >> > > >> > > On Tue, Sep 6, 2022 at 4:42 PM Jorge Esteban Quilcate Otoya < >> > > quilcate.jo...@gmail.com> wrote: >> > > >> > > > Hi Sagar and Yash, >> > > > >> > > > > the way it's defined in >> > > > https://kafka.apache.org/documentation/#connect_monitoring for the >> > > metrics >> > > > >> > > > 4.1. Got it. Add it to the KIP. >> > > > >> > > > > The only thing I would argue is do we need >> sink-record-latency-min? >> > > Maybe >> > > > we >> > > > > could remove this min metric as well and make all of the 3 e2e >> > metrics >> > > > > consistent >> > > > >> > > > 4.2 I see. Will remove it from the KIP. >> > > > >> > > > > Probably users can track the metrics at their end to >> > > > > figure that out. Do you think that makes sense? >> > > > >> > > > 4.3. Yes, agree. With these new metrics it should be easier for >> users >> > to >> > > > track this. >> > > > >> > > > > I think it makes sense to not have a min metric for either to >> remain >> > > > > consistent with the existing put-batch and poll-batch metrics >> > > > >> > > > 5.1. Got it. Same as 4.2 >> > > > >> > > > > Another naming related suggestion I had was with the >> > > > > "convert-time" metrics - we should probably include >> transformations >> > in >> > > > the >> > > > > name since SMTs could definitely be attributable to a sizable >> chunk >> > of >> > > > the >> > > > > latency depending on the specific transformation chain. >> > > > >> > > > 5.2. Make sense. I'm proposing to add >> > `sink-record-convert-transform...` >> > > > and `source-record-transform-convert...` to represent correctly the >> > order >> > > > of operations. >> > > > >> > > > > it seems like both source and sink tasks only record metrics at a >> > > "batch" >> > > > > level, not on an individual record level. I think it might be >> > > additional >> > > > > overhead if we want to record these new metrics all at the record >> > > level? >> > > > >> > > > 5.3. I considered at the beginning to implement all metrics at the >> > batch >> > > > level, but given how the framework process records, I fallback to >> the >> > > > proposed approach: >> > > > - Sink Task: >> > > > - `WorkerSinkTask#convertMessages(msgs)` already iterates over >> > records, >> > > > so there is no additional overhead to capture record latency per >> > record. >> > > > - >> > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514 >> > > > - `WorkerSinkTask#convertAndTransformRecord(record)` actually >> happens >> > > > individually. Measuring this operation per batch would include >> > processing >> > > > that is not strictly part of "convert and transform" >> > > > - >> > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518 >> > > > - Source Task: >> > > > - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and >> > > > applies transforms and convert record individually as well: >> > > > - >> > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390 >> > > > >> > > > > This might require some additional changes - >> > > > > for instance, with the "sink-record-latency" metric, we might only >> > want >> > > > to >> > > > > have a "max" metric since "avg" would require recording a value on >> > the >> > > > > sensor for each record (whereas we can get a "max" by only >> recording >> > a >> > > > > metric value for the oldest record in each batch). >> > > > >> > > > 5.4. Recording record-latency per batch may not be as useful as >> there >> > is >> > > no >> > > > guarantee that the oldest record will be representative of the >> batch. >> > > > >> > > > On Sat, 3 Sept 2022 at 16:02, Yash Mayya <yash.ma...@gmail.com> >> wrote: >> > > > >> > > > > Hi Jorge and Sagar, >> > > > > >> > > > > I think it makes sense to not have a min metric for either to >> remain >> > > > > consistent with the existing put-batch and poll-batch metrics (it >> > > doesn't >> > > > > seem particularly useful either anyway). Also, the new >> > > > > "sink-record-latency" metric name looks fine to me, thanks for >> making >> > > the >> > > > > changes! Another naming related suggestion I had was with the >> > > > > "convert-time" metrics - we should probably include >> transformations >> > in >> > > > the >> > > > > name since SMTs could definitely be attributable to a sizable >> chunk >> > of >> > > > the >> > > > > latency depending on the specific transformation chain. >> > > > > >> > > > > I have one high level question with respect to implementation - >> > > > currently, >> > > > > it seems like both source and sink tasks only record metrics at a >> > > "batch" >> > > > > level, not on an individual record level. I think it might be >> > > additional >> > > > > overhead if we want to record these new metrics all at the record >> > > level? >> > > > > Could we instead make all of these new metrics for batches of >> records >> > > > > rather than individual records in order to remain consistent with >> the >> > > > > existing task level metrics? This might require some additional >> > > changes - >> > > > > for instance, with the "sink-record-latency" metric, we might only >> > want >> > > > to >> > > > > have a "max" metric since "avg" would require recording a value on >> > the >> > > > > sensor for each record (whereas we can get a "max" by only >> recording >> > a >> > > > > metric value for the oldest record in each batch). >> > > > > >> > > > > Thanks, >> > > > > Yash >> > > > > >> > > > > On Fri, Sep 2, 2022 at 3:16 PM Sagar <sagarmeansoc...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hi Jorge, >> > > > > > >> > > > > > Thanks for the changes. >> > > > > > >> > > > > > Regarding the metrics, I meant something like this: >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}" >> > > > > > >> > > > > > the way it's defined in >> > > > > > https://kafka.apache.org/documentation/#connect_monitoring for >> the >> > > > > > metrics. >> > > > > > >> > > > > > I see what you mean by the 3 metrics and how it can be >> interpreted. >> > > The >> > > > > > only thing I would argue is do we need sink-record-latency-min? >> > Maybe >> > > > we >> > > > > > could remove this min metric as well and make all of the 3 e2e >> > > metrics >> > > > > > consistent(since put-batch also doesn't expose a min which makes >> > > sense >> > > > to >> > > > > > me). I think this is in contrast to what Yash pointed out above >> so >> > I >> > > > > would >> > > > > > like to hear his thoughts as well. >> > > > > > >> > > > > > The other point Yash mentioned about the slightly flawed >> definition >> > > of >> > > > > e2e >> > > > > > is also true in a sense. But I have a feeling that's one the >> > records >> > > > are >> > > > > > polled by the connector tasks, it would be difficult to track >> the >> > > final >> > > > > leg >> > > > > > via the framework. Probably users can track the metrics at their >> > end >> > > to >> > > > > > figure that out. Do you think that makes sense? >> > > > > > >> > > > > > Thanks! >> > > > > > Sagar. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya < >> > > > > > quilcate.jo...@gmail.com> wrote: >> > > > > > >> > > > > > > Hi Sagar and Yash, >> > > > > > > >> > > > > > > Thanks for your feedback! >> > > > > > > >> > > > > > > > 1) I am assuming the new metrics would be task level metric. >> > > > > > > >> > > > > > > 1.1 Yes, it will be a task level metric, implemented on the >> > > > > > > Worker[Source/Sink]Task. >> > > > > > > >> > > > > > > > Could you specify the way it's done for other sink/source >> > > > connector? >> > > > > > > >> > > > > > > 1.2. Not sure what do you mean by this. Could you elaborate a >> bit >> > > > more? >> > > > > > > >> > > > > > > > 2. I am slightly confused about the e2e latency metric... >> > > > > > > >> > > > > > > 2.1. Yes, I see. I was trying to bring a similar concept as in >> > > > Streams >> > > > > > with >> > > > > > > KIP-613, though the e2e concept may not be translatable. >> > > > > > > We could keep it as `sink-record-latency` to avoid conflating >> > > > > concepts. A >> > > > > > > similar metric naming was proposed in KIP-489 but at the >> consumer >> > > > > level — >> > > > > > > though it seems dormant for a couple of years. >> > > > > > > >> > > > > > > > However, the put-batch time measures the >> > > > > > > > time to put a batch of records to external sink. So, I would >> > > assume >> > > > > > the 2 >> > > > > > > > can't be added as is to compute the e2e latency. Maybe I am >> > > missing >> > > > > > > > something here. Could you plz clarify this. >> > > > > > > >> > > > > > > 2.2. Yes, agree. Not necessarily added, but with the 3 >> latencies >> > > > (poll, >> > > > > > > convert, putBatch) will be clearer where the bottleneck may >> be, >> > and >> > > > > > > represent the internal processing. >> > > > > > > >> > > > > > > > however, as per the KIP it looks like it will be >> > > > > > > > the latency between when the record was written to Kafka and >> > when >> > > > the >> > > > > > > > record is returned by a sink task's consumer's poll? >> > > > > > > >> > > > > > > 3.1. Agree. 2.1. could help to clarify this. >> > > > > > > >> > > > > > > > One more thing - I was wondering >> > > > > > > > if there's a particular reason for having a min metric for >> e2e >> > > > > latency >> > > > > > > but >> > > > > > > > not for convert time? >> > > > > > > >> > > > > > > 3.2. Was following KIP-613 for e2e which seems useful to >> compare >> > > with >> > > > > > Max a >> > > > > > > get an idea of the window of results, though current >> latencies in >> > > > > > Connector >> > > > > > > do not include Min, and that's why I haven't added it for >> convert >> > > > > > latency. >> > > > > > > Do you think it make sense to extend latency metrics with Min? >> > > > > > > >> > > > > > > KIP is updated to clarify some of these changes. >> > > > > > > >> > > > > > > Many thanks, >> > > > > > > Jorge. >> > > > > > > >> > > > > > > On Thu, 1 Sept 2022 at 18:11, Yash Mayya < >> yash.ma...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > > > Hi Jorge, >> > > > > > > > >> > > > > > > > Thanks for the KIP! I have the same confusion with the >> > > e2e-latency >> > > > > > > metrics >> > > > > > > > as Sagar above. "e2e" would seem to indicate the latency >> > between >> > > > when >> > > > > > the >> > > > > > > > record was written to Kafka and when the record was written >> to >> > > the >> > > > > sink >> > > > > > > > system by the connector - however, as per the KIP it looks >> like >> > > it >> > > > > will >> > > > > > > be >> > > > > > > > the latency between when the record was written to Kafka and >> > when >> > > > the >> > > > > > > > record is returned by a sink task's consumer's poll? I think >> > that >> > > > > > metric >> > > > > > > > will be a little confusing to interpret. One more thing - I >> was >> > > > > > wondering >> > > > > > > > if there's a particular reason for having a min metric for >> e2e >> > > > > latency >> > > > > > > but >> > > > > > > > not for convert time? >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Yash >> > > > > > > > >> > > > > > > > On Thu, Sep 1, 2022 at 8:59 PM Sagar < >> > sagarmeansoc...@gmail.com> >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi Jorge, >> > > > > > > > > >> > > > > > > > > Thanks for the KIP. It looks like a very good addition. I >> > > skimmed >> > > > > > > through >> > > > > > > > > once and had a couple of questions => >> > > > > > > > > >> > > > > > > > > 1) I am assuming the new metrics would be task level >> metric. >> > > > Could >> > > > > > you >> > > > > > > > > specify the way it's done for other sink/source connector? >> > > > > > > > > 2) I am slightly confused about the e2e latency metric. >> Let's >> > > > > > consider >> > > > > > > > the >> > > > > > > > > sink connector metric. If I look at the way it's supposed >> to >> > be >> > > > > > > > calculated, >> > > > > > > > > i.e the difference between the record timestamp and the >> wall >> > > > clock >> > > > > > > time, >> > > > > > > > it >> > > > > > > > > looks like a per record metric. However, the put-batch >> time >> > > > > measures >> > > > > > > the >> > > > > > > > > time to put a batch of records to external sink. So, I >> would >> > > > assume >> > > > > > > the 2 >> > > > > > > > > can't be added as is to compute the e2e latency. Maybe I >> am >> > > > missing >> > > > > > > > > something here. Could you plz clarify this. >> > > > > > > > > >> > > > > > > > > Thanks! >> > > > > > > > > Sagar. >> > > > > > > > > >> > > > > > > > > On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate >> Otoya >> > < >> > > > > > > > > quilcate.jo...@gmail.com> wrote: >> > > > > > > > > >> > > > > > > > > > Hi all, >> > > > > > > > > > >> > > > > > > > > > I'd like to start a discussion thread on KIP-864: Add >> > > > End-To-End >> > > > > > > > Latency >> > > > > > > > > > Metrics to Connectors. >> > > > > > > > > > This KIP aims to improve the metrics available on Source >> > and >> > > > Sink >> > > > > > > > > > Connectors to measure end-to-end latency, including >> source >> > > and >> > > > > sink >> > > > > > > > > record >> > > > > > > > > > conversion time, and sink record e2e latency (similar to >> > > > KIP-613 >> > > > > > for >> > > > > > > > > > Streams). >> > > > > > > > > > >> > > > > > > > > > The KIP is here: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors >> > > > > > > > > > >> > > > > > > > > > Please take a look and let me know what you think. >> > > > > > > > > > >> > > > > > > > > > Cheers, >> > > > > > > > > > Jorge. >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >