Efrat Levitan created FLINK-39892:
-------------------------------------
Summary: Exactly-once kafka sink reports a negative numBytesOut
delta after every checkpoint
Key: FLINK-39892
URL: https://issues.apache.org/jira/browse/FLINK-39892
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 2.2.1
Reporter: Efrat Levitan
The following counters
{{taskmanager.job.task.numBytesOutPerSecond}}
{{taskmanager.job.task.numBytesOut}}
{{taskmanager.job.task.operator.numBytesOutPerSecond}}
{{taskmanager.job.task.operator.numBytesSend}}
frequently report a value decrement for exactly once kafka sink.
This is because {{KafkaWriter#initKafkaMetrics}} is called per producer and
resets {{byteOutMetric}} but not {{latestOutgoingByteTotal}}
{{KafkaWriter#registerMetricSync}} calculates the delta of
[outgoingBytesUntilNow -
latestOutgoingByteTotal|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L276-L278]
so every time a new producer is spawned, {{numBytesOutCounter.inc}} is called
with a negative delta, (0 - {{latestOutgoingByteTotal}} of the former producer)
Different exporters handle this differently - otel exporter
[drops|https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricAdapter.java#L66-L73]
the datapoint with a warn
While prometheus exporter treats flink counters as
[gauges|https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java#L210-L212]
so the incorrect datapoint is still reported.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)