fapaul commented on a change in pull request #16875: URL: https://github.com/apache/flink/pull/16875#discussion_r692878739
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -389,4 +409,28 @@ private void initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) { return Collections.emptyList(); } } + + private static long computeSendTime(Producer<?, ?> producer) { + final Metric sendTime = + MetricUtil.getKafkaMetric( + producer.metrics(), "producer-metrics", "request-latency-avg"); + final Metric queueTime = + MetricUtil.getKafkaMetric( + producer.metrics(), "producer-metrics", "record-queue-time-avg"); + return ((Number) sendTime.metricValue()).longValue() + + ((Number) queueTime.metricValue()).longValue(); + } + + private void registerMetricSync() { + if (closed) { + return; + } + timeService.registerProcessingTimer( + lastSync + METRIC_UPDATE_INTERVAL_MILLIS, + (time) -> { + MetricUtil.sync(byteOutMetric, numBytesOutCounter); + lastSync = time; + registerMetricSync(); + }); Review comment: > It might be easier to just use `System.currentTimeMillis()` instead of `lastSync` to not trigger stuff to often in case of overload? But not sure. I am not sure what the best way here is. With your suggestion we need to call `System.currentTimeMillis()` for every registration. Currently is only needed once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org