Chris Beard created KAFKA-14659:
-----------------------------------

             Summary: source-record-write-[rate|total] metrics include filtered 
records
                 Key: KAFKA-14659
                 URL: https://issues.apache.org/jira/browse/KAFKA-14659
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Chris Beard


Source tasks in Kafka connect offer two sets of metrics (documented in 
[ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
||Metric||Description||
|source-record-poll-rate|The average per-second number of records 
produced/polled (before transformation) by this task belonging to the named 
source connector in this worker.|
|source-record-write-rate|The average per-second number of records output from 
the transformations and written to Kafka for this task belonging to the named 
source connector in this worker. This is after transformations are applied and 
excludes any records filtered out by the transformations.|

There are also corresponding "-total" metrics that capture the total number of 
records polled and written for the metrics above, respectively.

In short, the "poll" metrics capture the number of messages sourced 
pre-transformation/filtering, and the "write" metrics should capture the number 
of messages ultimately written to Kafka post-transformation/filtering. However, 
the implementation of the {{source-record-write-*}}  metrics _includes_ records 
filtered out by transformations (and also records that result in produce 
failures with the config {{{}errors.tolerance=all{}}}).
h3. Details

In 
[AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
 each source record is passed through the transformation chain where it is 
potentially filtered out, checked to see if it was in fact filtered out, and if 
so it is accounted for in the internal metrics via {{{}counter.skipRecord(){}}}.
{code:java}
for (final SourceRecord preTransformRecord : toSend) {         
    retryWithToleranceOperator.sourceRecord(preTransformRecord);
    final SourceRecord record = transformationChain.apply(preTransformRecord);  
          
    final ProducerRecord<byte[], byte[]> producerRecord = 
convertTransformedRecord(record);
    if (producerRecord == null || retryWithToleranceOperator.failed()) {        
        
        counter.skipRecord();
        recordDropped(preTransformRecord);
        continue;
    }
    ...
{code}
{{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
{code:java}
    ....
    public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
metricsGroup) {
        assert batchSize > 0;
        assert metricsGroup != null;
        this.batchSize = batchSize;
        counter = batchSize;
        this.metricsGroup = metricsGroup;
    }
    public void skipRecord() {
        if (counter > 0 && --counter == 0) {
            finishedAllWrites();
        }
    }
    ....
    private void finishedAllWrites() {
        if (!completed) {
            metricsGroup.recordWrite(batchSize - counter);
            completed = true;
        }
    }
{code}
For example: If a batch starts with 100 records, {{batchSize}} and {{counter}} 
will both be initialized to 100. If all 100 records get filtered out, 
{{counter}} will be decremented 100 times, and {{{}finishedAllWrites(){}}}will 
record the value 100 to the underlying {{source-record-write-*}}  metrics 
rather than 0, the correct value according to the documentation for these 
metrics.
h3. Solutions

Assuming the documentation correctly captures the intent of the 
{{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
such that filtered records do not get counted.

It may also be useful to add additional metrics to capture the rate and total 
number of records filtered out by transformations, which would require a KIP.

I'm not sure what the best way of accounting for produce failures in the case 
of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own new 
metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to