Hi Sagar and Yash,

> the way it's defined in
https://kafka.apache.org/documentation/#connect_monitoring for the metrics

4.1. Got it. Add it to the KIP.

> The only thing I would argue is do we need sink-record-latency-min? Maybe
we
> could remove this min metric as well and make all of the 3 e2e metrics
> consistent

4.2 I see. Will remove it from the KIP.

> Probably users can track the metrics at their end to
> figure that out. Do you think that makes sense?

4.3. Yes, agree. With these new metrics it should be easier for users to
track this.

> I think it makes sense to not have a min metric for either to remain
> consistent with the existing put-batch and poll-batch metrics

5.1. Got it. Same as 4.2

> Another naming related suggestion I had was with the
> "convert-time" metrics - we should probably include transformations in the
> name since SMTs could definitely be attributable to a sizable chunk of the
> latency depending on the specific transformation chain.

5.2. Make sense. I'm proposing to add `sink-record-convert-transform...`
and `source-record-transform-convert...` to represent correctly the order
of operations.

> it seems like both source and sink tasks only record metrics at a "batch"
> level, not on an individual record level. I think it might be additional
> overhead if we want to record these new metrics all at the record level?

5.3. I considered at the beginning to implement all metrics at the batch
level, but given how the framework process records, I fallback to the
proposed approach:
- Sink Task:
  - `WorkerSinkTask#convertMessages(msgs)` already iterates over records,
so there is no additional overhead to capture record latency per record.
    -
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514
  - `WorkerSinkTask#convertAndTransformRecord(record)` actually happens
individually. Measuring this operation per batch would include processing
that is not strictly part of "convert and transform"
    -
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518
- Source Task:
  - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and
applies transforms and convert record individually as well:
    -
https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390

> This might require some additional changes -
> for instance, with the "sink-record-latency" metric, we might only want to
> have a "max" metric since "avg" would require recording a value on the
> sensor for each record (whereas we can get a "max" by only recording a
> metric value for the oldest record in each batch).

5.4. Recording record-latency per batch may not be as useful as there is no
guarantee that the oldest record will be representative of the batch.

On Sat, 3 Sept 2022 at 16:02, Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Jorge and Sagar,
>
> I think it makes sense to not have a min metric for either to remain
> consistent with the existing put-batch and poll-batch metrics (it doesn't
> seem particularly useful either anyway). Also, the new
> "sink-record-latency" metric name looks fine to me, thanks for making the
> changes! Another naming related suggestion I had was with the
> "convert-time" metrics - we should probably include transformations in the
> name since SMTs could definitely be attributable to a sizable chunk of the
> latency depending on the specific transformation chain.
>
> I have one high level question with respect to implementation - currently,
> it seems like both source and sink tasks only record metrics at a "batch"
> level, not on an individual record level. I think it might be additional
> overhead if we want to record these new metrics all at the record level?
> Could we instead make all of these new metrics for batches of records
> rather than individual records in order to remain consistent with the
> existing task level metrics? This might require some additional changes -
> for instance, with the "sink-record-latency" metric, we might only want to
> have a "max" metric since "avg" would require recording a value on the
> sensor for each record (whereas we can get a "max" by only recording a
> metric value for the oldest record in each batch).
>
> Thanks,
> Yash
>
> On Fri, Sep 2, 2022 at 3:16 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hi Jorge,
> >
> > Thanks for the changes.
> >
> > Regarding the metrics, I meant something like this:
> >
> kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
> >
> > the way it's defined in
> > https://kafka.apache.org/documentation/#connect_monitoring for the
> > metrics.
> >
> > I see what you mean by the 3 metrics and how it can be interpreted. The
> > only thing I would argue is do we need sink-record-latency-min? Maybe we
> > could remove this min metric as well and make all of the 3 e2e metrics
> > consistent(since put-batch also doesn't expose a min which makes sense to
> > me). I think this is in contrast to what Yash pointed out above so I
> would
> > like to hear his thoughts as well.
> >
> > The other point Yash mentioned about the slightly flawed definition of
> e2e
> > is also true in a sense. But I have a feeling that's one the records are
> > polled by the connector tasks, it would be difficult to track the final
> leg
> > via the framework. Probably users can track the metrics at their end to
> > figure that out. Do you think that makes sense?
> >
> > Thanks!
> > Sagar.
> >
> >
> >
> >
> > On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Hi Sagar and Yash,
> > >
> > > Thanks for your feedback!
> > >
> > > > 1) I am assuming the new metrics would be task level metric.
> > >
> > > 1.1 Yes, it will be a task level metric, implemented on the
> > > Worker[Source/Sink]Task.
> > >
> > > > Could you specify the way it's done for other sink/source connector?
> > >
> > > 1.2. Not sure what do you mean by this. Could you elaborate a bit more?
> > >
> > > > 2. I am slightly confused about the e2e latency metric...
> > >
> > > 2.1. Yes, I see. I was trying to bring a similar concept as in Streams
> > with
> > > KIP-613, though the e2e concept may not be translatable.
> > > We could keep it as `sink-record-latency` to avoid conflating
> concepts. A
> > > similar metric naming was proposed in KIP-489 but at the consumer
> level —
> > > though it seems dormant for a couple of years.
> > >
> > > > However, the put-batch time measures the
> > > > time to put a batch of records to external sink. So, I would assume
> > the 2
> > > > can't be added as is to compute the e2e latency. Maybe I am missing
> > > > something here. Could you plz clarify this.
> > >
> > > 2.2. Yes, agree. Not necessarily added, but with the 3 latencies (poll,
> > > convert, putBatch) will be clearer where the bottleneck may be, and
> > > represent the internal processing.
> > >
> > > > however, as per the KIP it looks like it will be
> > > > the latency between when the record was written to Kafka and when the
> > > > record is returned by a sink task's consumer's poll?
> > >
> > > 3.1. Agree. 2.1. could help to clarify this.
> > >
> > > > One more thing - I was wondering
> > > > if there's a particular reason for having a min metric for e2e
> latency
> > > but
> > > > not for convert time?
> > >
> > > 3.2. Was following KIP-613 for e2e which seems useful to compare with
> > Max a
> > > get an idea of the window of results, though current latencies in
> > Connector
> > > do not include Min, and that's why I haven't added it for convert
> > latency.
> > > Do you think it make sense to extend latency metrics with Min?
> > >
> > > KIP is updated to clarify some of these changes.
> > >
> > > Many thanks,
> > > Jorge.
> > >
> > > On Thu, 1 Sept 2022 at 18:11, Yash Mayya <yash.ma...@gmail.com> wrote:
> > >
> > > > Hi Jorge,
> > > >
> > > > Thanks for the KIP! I have the same confusion with the e2e-latency
> > > metrics
> > > > as Sagar above. "e2e" would seem to indicate the latency between when
> > the
> > > > record was written to Kafka and when the record was written to the
> sink
> > > > system by the connector - however, as per the KIP it looks like it
> will
> > > be
> > > > the latency between when the record was written to Kafka and when the
> > > > record is returned by a sink task's consumer's poll? I think that
> > metric
> > > > will be a little confusing to interpret. One more thing - I was
> > wondering
> > > > if there's a particular reason for having a min metric for e2e
> latency
> > > but
> > > > not for convert time?
> > > >
> > > > Thanks,
> > > > Yash
> > > >
> > > > On Thu, Sep 1, 2022 at 8:59 PM Sagar <sagarmeansoc...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jorge,
> > > > >
> > > > > Thanks for the KIP. It looks like a very good addition. I skimmed
> > > through
> > > > > once and had a couple of questions =>
> > > > >
> > > > > 1) I am assuming the new metrics would be task level metric. Could
> > you
> > > > > specify the way it's done for other sink/source connector?
> > > > > 2) I am slightly confused about the e2e latency metric. Let's
> > consider
> > > > the
> > > > > sink connector metric. If I look at the way it's supposed to be
> > > > calculated,
> > > > > i.e the difference between the record timestamp and the wall clock
> > > time,
> > > > it
> > > > > looks like a per record metric. However, the put-batch time
> measures
> > > the
> > > > > time to put a batch of records to external sink. So, I would assume
> > > the 2
> > > > > can't be added as is to compute the e2e latency. Maybe I am missing
> > > > > something here. Could you plz clarify this.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jo...@gmail.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'd like to start a discussion thread on KIP-864: Add End-To-End
> > > > Latency
> > > > > > Metrics to Connectors.
> > > > > > This KIP aims to improve the metrics available on Source and Sink
> > > > > > Connectors to measure end-to-end latency, including source and
> sink
> > > > > record
> > > > > > conversion time, and sink record e2e latency (similar to KIP-613
> > for
> > > > > > Streams).
> > > > > >
> > > > > > The KIP is here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors
> > > > > >
> > > > > > Please take a look and let me know what you think.
> > > > > >
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to