Efrat Levitan created FLINK-39892:
-------------------------------------

             Summary: Exactly-once kafka sink reports a negative numBytesOut 
delta after every checkpoint
                 Key: FLINK-39892
                 URL: https://issues.apache.org/jira/browse/FLINK-39892
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 2.2.1
            Reporter: Efrat Levitan


The following counters

{{taskmanager.job.task.numBytesOutPerSecond}}
{{taskmanager.job.task.numBytesOut}}
{{taskmanager.job.task.operator.numBytesOutPerSecond}}
{{taskmanager.job.task.operator.numBytesSend}}

frequently report a value decrement for exactly once kafka sink.
This is because {{KafkaWriter#initKafkaMetrics}} is called per producer and 
resets {{byteOutMetric}} but not {{latestOutgoingByteTotal}}
{{KafkaWriter#registerMetricSync}} calculates the delta of 
[outgoingBytesUntilNow - 
latestOutgoingByteTotal|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L276-L278]
 so every time a new producer is spawned, {{numBytesOutCounter.inc}} is called 
with a negative delta, (0 - {{latestOutgoingByteTotal}} of the former producer)

Different exporters handle this differently - otel exporter 
[drops|https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricAdapter.java#L66-L73]
 the datapoint with a warn

While prometheus exporter treats flink counters as 
[gauges|https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java#L210-L212]
 so the incorrect datapoint is still reported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to