Hi everyone,

I've made a slight addition to the KIP based on Yash feedback:

- A new metric is added at INFO level to record the max latency from the
batch timestamp, by keeping the oldest record timestamp per batch.
- A draft implementation is linked.

Looking forward to your feedback.
Also, a kindly reminder that the vote thread is open.

Thanks!
Jorge.

On Thu, 8 Sept 2022 at 14:25, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Great. I have updated the KIP to reflect this.
>
> Cheers,
> Jorge.
>
> On Thu, 8 Sept 2022 at 12:26, Yash Mayya <yash.ma...@gmail.com> wrote:
>
>> Thanks, I think it makes sense to define these metrics at a DEBUG
>> recording
>> level.
>>
>> On Thu, Sep 8, 2022 at 2:51 PM Jorge Esteban Quilcate Otoya <
>> quilcate.jo...@gmail.com> wrote:
>>
>> > On Thu, 8 Sept 2022 at 05:55, Yash Mayya <yash.ma...@gmail.com> wrote:
>> >
>> > > Hi Jorge,
>> > >
>> > > Thanks for the changes. With regard to having per batch vs per record
>> > > metrics, the additional overhead I was referring to wasn't about
>> whether
>> > or
>> > > not we would need to iterate over all the records in a batch. I was
>> > > referring to the potential additional overhead caused by the higher
>> > volume
>> > > of calls to Sensor::record on the sensors for the new metrics (as
>> > compared
>> > > to the existing batch only metrics), especially for high throughput
>> > > connectors where batch sizes could be large. I guess we may want to do
>> > some
>> > > sort of performance testing and get concrete numbers to verify whether
>> > this
>> > > is a valid concern or not?
>> > >
>> >
>> > 6.1. Got it, thanks for clarifying. I guess there could be a benchmark
>> test
>> > of the `Sensor::record` to get an idea of the performance impact.
>> > Regardless, the fact that these are single-record metrics compared to
>> > existing batch-only could be explicitly defined by setting these
>> metrics at
>> > a DEBUG or TRACE metric recording level, leaving the existing at INFO
>> > level.
>> > wdyt?
>> >
>> >
>> > >
>> > > Thanks,
>> > > Yash
>> > >
>> > > On Tue, Sep 6, 2022 at 4:42 PM Jorge Esteban Quilcate Otoya <
>> > > quilcate.jo...@gmail.com> wrote:
>> > >
>> > > > Hi Sagar and Yash,
>> > > >
>> > > > > the way it's defined in
>> > > > https://kafka.apache.org/documentation/#connect_monitoring for the
>> > > metrics
>> > > >
>> > > > 4.1. Got it. Add it to the KIP.
>> > > >
>> > > > > The only thing I would argue is do we need
>> sink-record-latency-min?
>> > > Maybe
>> > > > we
>> > > > > could remove this min metric as well and make all of the 3 e2e
>> > metrics
>> > > > > consistent
>> > > >
>> > > > 4.2 I see. Will remove it from the KIP.
>> > > >
>> > > > > Probably users can track the metrics at their end to
>> > > > > figure that out. Do you think that makes sense?
>> > > >
>> > > > 4.3. Yes, agree. With these new metrics it should be easier for
>> users
>> > to
>> > > > track this.
>> > > >
>> > > > > I think it makes sense to not have a min metric for either to
>> remain
>> > > > > consistent with the existing put-batch and poll-batch metrics
>> > > >
>> > > > 5.1. Got it. Same as 4.2
>> > > >
>> > > > > Another naming related suggestion I had was with the
>> > > > > "convert-time" metrics - we should probably include
>> transformations
>> > in
>> > > > the
>> > > > > name since SMTs could definitely be attributable to a sizable
>> chunk
>> > of
>> > > > the
>> > > > > latency depending on the specific transformation chain.
>> > > >
>> > > > 5.2. Make sense. I'm proposing to add
>> > `sink-record-convert-transform...`
>> > > > and `source-record-transform-convert...` to represent correctly the
>> > order
>> > > > of operations.
>> > > >
>> > > > > it seems like both source and sink tasks only record metrics at a
>> > > "batch"
>> > > > > level, not on an individual record level. I think it might be
>> > > additional
>> > > > > overhead if we want to record these new metrics all at the record
>> > > level?
>> > > >
>> > > > 5.3. I considered at the beginning to implement all metrics at the
>> > batch
>> > > > level, but given how the framework process records, I fallback to
>> the
>> > > > proposed approach:
>> > > > - Sink Task:
>> > > >   - `WorkerSinkTask#convertMessages(msgs)` already iterates over
>> > records,
>> > > > so there is no additional overhead to capture record latency per
>> > record.
>> > > >     -
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L490-L514
>> > > >   - `WorkerSinkTask#convertAndTransformRecord(record)` actually
>> happens
>> > > > individually. Measuring this operation per batch would include
>> > processing
>> > > > that is not strictly part of "convert and transform"
>> > > >     -
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L518
>> > > > - Source Task:
>> > > >   - `AbstractWorkerSourceTask#sendRecords` iterates over a batch and
>> > > > applies transforms and convert record individually as well:
>> > > >     -
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/9841647c4fe422532f448423c92d26e4fdcb8932/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L390
>> > > >
>> > > > > This might require some additional changes -
>> > > > > for instance, with the "sink-record-latency" metric, we might only
>> > want
>> > > > to
>> > > > > have a "max" metric since "avg" would require recording a value on
>> > the
>> > > > > sensor for each record (whereas we can get a "max" by only
>> recording
>> > a
>> > > > > metric value for the oldest record in each batch).
>> > > >
>> > > > 5.4. Recording record-latency per batch may not be as useful as
>> there
>> > is
>> > > no
>> > > > guarantee that the oldest record will be representative of the
>> batch.
>> > > >
>> > > > On Sat, 3 Sept 2022 at 16:02, Yash Mayya <yash.ma...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi Jorge and Sagar,
>> > > > >
>> > > > > I think it makes sense to not have a min metric for either to
>> remain
>> > > > > consistent with the existing put-batch and poll-batch metrics (it
>> > > doesn't
>> > > > > seem particularly useful either anyway). Also, the new
>> > > > > "sink-record-latency" metric name looks fine to me, thanks for
>> making
>> > > the
>> > > > > changes! Another naming related suggestion I had was with the
>> > > > > "convert-time" metrics - we should probably include
>> transformations
>> > in
>> > > > the
>> > > > > name since SMTs could definitely be attributable to a sizable
>> chunk
>> > of
>> > > > the
>> > > > > latency depending on the specific transformation chain.
>> > > > >
>> > > > > I have one high level question with respect to implementation -
>> > > > currently,
>> > > > > it seems like both source and sink tasks only record metrics at a
>> > > "batch"
>> > > > > level, not on an individual record level. I think it might be
>> > > additional
>> > > > > overhead if we want to record these new metrics all at the record
>> > > level?
>> > > > > Could we instead make all of these new metrics for batches of
>> records
>> > > > > rather than individual records in order to remain consistent with
>> the
>> > > > > existing task level metrics? This might require some additional
>> > > changes -
>> > > > > for instance, with the "sink-record-latency" metric, we might only
>> > want
>> > > > to
>> > > > > have a "max" metric since "avg" would require recording a value on
>> > the
>> > > > > sensor for each record (whereas we can get a "max" by only
>> recording
>> > a
>> > > > > metric value for the oldest record in each batch).
>> > > > >
>> > > > > Thanks,
>> > > > > Yash
>> > > > >
>> > > > > On Fri, Sep 2, 2022 at 3:16 PM Sagar <sagarmeansoc...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi Jorge,
>> > > > > >
>> > > > > > Thanks for the changes.
>> > > > > >
>> > > > > > Regarding the metrics, I meant something like this:
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
>> > > > > >
>> > > > > > the way it's defined in
>> > > > > > https://kafka.apache.org/documentation/#connect_monitoring for
>> the
>> > > > > > metrics.
>> > > > > >
>> > > > > > I see what you mean by the 3 metrics and how it can be
>> interpreted.
>> > > The
>> > > > > > only thing I would argue is do we need sink-record-latency-min?
>> > Maybe
>> > > > we
>> > > > > > could remove this min metric as well and make all of the 3 e2e
>> > > metrics
>> > > > > > consistent(since put-batch also doesn't expose a min which makes
>> > > sense
>> > > > to
>> > > > > > me). I think this is in contrast to what Yash pointed out above
>> so
>> > I
>> > > > > would
>> > > > > > like to hear his thoughts as well.
>> > > > > >
>> > > > > > The other point Yash mentioned about the slightly flawed
>> definition
>> > > of
>> > > > > e2e
>> > > > > > is also true in a sense. But I have a feeling that's one the
>> > records
>> > > > are
>> > > > > > polled by the connector tasks, it would be difficult to track
>> the
>> > > final
>> > > > > leg
>> > > > > > via the framework. Probably users can track the metrics at their
>> > end
>> > > to
>> > > > > > figure that out. Do you think that makes sense?
>> > > > > >
>> > > > > > Thanks!
>> > > > > > Sagar.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Sep 1, 2022 at 11:40 PM Jorge Esteban Quilcate Otoya <
>> > > > > > quilcate.jo...@gmail.com> wrote:
>> > > > > >
>> > > > > > > Hi Sagar and Yash,
>> > > > > > >
>> > > > > > > Thanks for your feedback!
>> > > > > > >
>> > > > > > > > 1) I am assuming the new metrics would be task level metric.
>> > > > > > >
>> > > > > > > 1.1 Yes, it will be a task level metric, implemented on the
>> > > > > > > Worker[Source/Sink]Task.
>> > > > > > >
>> > > > > > > > Could you specify the way it's done for other sink/source
>> > > > connector?
>> > > > > > >
>> > > > > > > 1.2. Not sure what do you mean by this. Could you elaborate a
>> bit
>> > > > more?
>> > > > > > >
>> > > > > > > > 2. I am slightly confused about the e2e latency metric...
>> > > > > > >
>> > > > > > > 2.1. Yes, I see. I was trying to bring a similar concept as in
>> > > > Streams
>> > > > > > with
>> > > > > > > KIP-613, though the e2e concept may not be translatable.
>> > > > > > > We could keep it as `sink-record-latency` to avoid conflating
>> > > > > concepts. A
>> > > > > > > similar metric naming was proposed in KIP-489 but at the
>> consumer
>> > > > > level —
>> > > > > > > though it seems dormant for a couple of years.
>> > > > > > >
>> > > > > > > > However, the put-batch time measures the
>> > > > > > > > time to put a batch of records to external sink. So, I would
>> > > assume
>> > > > > > the 2
>> > > > > > > > can't be added as is to compute the e2e latency. Maybe I am
>> > > missing
>> > > > > > > > something here. Could you plz clarify this.
>> > > > > > >
>> > > > > > > 2.2. Yes, agree. Not necessarily added, but with the 3
>> latencies
>> > > > (poll,
>> > > > > > > convert, putBatch) will be clearer where the bottleneck may
>> be,
>> > and
>> > > > > > > represent the internal processing.
>> > > > > > >
>> > > > > > > > however, as per the KIP it looks like it will be
>> > > > > > > > the latency between when the record was written to Kafka and
>> > when
>> > > > the
>> > > > > > > > record is returned by a sink task's consumer's poll?
>> > > > > > >
>> > > > > > > 3.1. Agree. 2.1. could help to clarify this.
>> > > > > > >
>> > > > > > > > One more thing - I was wondering
>> > > > > > > > if there's a particular reason for having a min metric for
>> e2e
>> > > > > latency
>> > > > > > > but
>> > > > > > > > not for convert time?
>> > > > > > >
>> > > > > > > 3.2. Was following KIP-613 for e2e which seems useful to
>> compare
>> > > with
>> > > > > > Max a
>> > > > > > > get an idea of the window of results, though current
>> latencies in
>> > > > > > Connector
>> > > > > > > do not include Min, and that's why I haven't added it for
>> convert
>> > > > > > latency.
>> > > > > > > Do you think it make sense to extend latency metrics with Min?
>> > > > > > >
>> > > > > > > KIP is updated to clarify some of these changes.
>> > > > > > >
>> > > > > > > Many thanks,
>> > > > > > > Jorge.
>> > > > > > >
>> > > > > > > On Thu, 1 Sept 2022 at 18:11, Yash Mayya <
>> yash.ma...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Jorge,
>> > > > > > > >
>> > > > > > > > Thanks for the KIP! I have the same confusion with the
>> > > e2e-latency
>> > > > > > > metrics
>> > > > > > > > as Sagar above. "e2e" would seem to indicate the latency
>> > between
>> > > > when
>> > > > > > the
>> > > > > > > > record was written to Kafka and when the record was written
>> to
>> > > the
>> > > > > sink
>> > > > > > > > system by the connector - however, as per the KIP it looks
>> like
>> > > it
>> > > > > will
>> > > > > > > be
>> > > > > > > > the latency between when the record was written to Kafka and
>> > when
>> > > > the
>> > > > > > > > record is returned by a sink task's consumer's poll? I think
>> > that
>> > > > > > metric
>> > > > > > > > will be a little confusing to interpret. One more thing - I
>> was
>> > > > > > wondering
>> > > > > > > > if there's a particular reason for having a min metric for
>> e2e
>> > > > > latency
>> > > > > > > but
>> > > > > > > > not for convert time?
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Yash
>> > > > > > > >
>> > > > > > > > On Thu, Sep 1, 2022 at 8:59 PM Sagar <
>> > sagarmeansoc...@gmail.com>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Jorge,
>> > > > > > > > >
>> > > > > > > > > Thanks for the KIP. It looks like a very good addition. I
>> > > skimmed
>> > > > > > > through
>> > > > > > > > > once and had a couple of questions =>
>> > > > > > > > >
>> > > > > > > > > 1) I am assuming the new metrics would be task level
>> metric.
>> > > > Could
>> > > > > > you
>> > > > > > > > > specify the way it's done for other sink/source connector?
>> > > > > > > > > 2) I am slightly confused about the e2e latency metric.
>> Let's
>> > > > > > consider
>> > > > > > > > the
>> > > > > > > > > sink connector metric. If I look at the way it's supposed
>> to
>> > be
>> > > > > > > > calculated,
>> > > > > > > > > i.e the difference between the record timestamp and the
>> wall
>> > > > clock
>> > > > > > > time,
>> > > > > > > > it
>> > > > > > > > > looks like a per record metric. However, the put-batch
>> time
>> > > > > measures
>> > > > > > > the
>> > > > > > > > > time to put a batch of records to external sink. So, I
>> would
>> > > > assume
>> > > > > > > the 2
>> > > > > > > > > can't be added as is to compute the e2e latency. Maybe I
>> am
>> > > > missing
>> > > > > > > > > something here. Could you plz clarify this.
>> > > > > > > > >
>> > > > > > > > > Thanks!
>> > > > > > > > > Sagar.
>> > > > > > > > >
>> > > > > > > > > On Tue, Aug 30, 2022 at 8:43 PM Jorge Esteban Quilcate
>> Otoya
>> > <
>> > > > > > > > > quilcate.jo...@gmail.com> wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi all,
>> > > > > > > > > >
>> > > > > > > > > > I'd like to start a discussion thread on KIP-864: Add
>> > > > End-To-End
>> > > > > > > > Latency
>> > > > > > > > > > Metrics to Connectors.
>> > > > > > > > > > This KIP aims to improve the metrics available on Source
>> > and
>> > > > Sink
>> > > > > > > > > > Connectors to measure end-to-end latency, including
>> source
>> > > and
>> > > > > sink
>> > > > > > > > > record
>> > > > > > > > > > conversion time, and sink record e2e latency (similar to
>> > > > KIP-613
>> > > > > > for
>> > > > > > > > > > Streams).
>> > > > > > > > > >
>> > > > > > > > > > The KIP is here:
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Add+End-To-End+Latency+Metrics+to+Connectors
>> > > > > > > > > >
>> > > > > > > > > > Please take a look and let me know what you think.
>> > > > > > > > > >
>> > > > > > > > > > Cheers,
>> > > > > > > > > > Jorge.
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to