fapaul commented on a change in pull request #16875:
URL: https://github.com/apache/flink/pull/16875#discussion_r692878739



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -389,4 +409,28 @@ private void 
initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
             return Collections.emptyList();
         }
     }
+
+    private static long computeSendTime(Producer<?, ?> producer) {
+        final Metric sendTime =
+                MetricUtil.getKafkaMetric(
+                        producer.metrics(), "producer-metrics", 
"request-latency-avg");
+        final Metric queueTime =
+                MetricUtil.getKafkaMetric(
+                        producer.metrics(), "producer-metrics", 
"record-queue-time-avg");
+        return ((Number) sendTime.metricValue()).longValue()
+                + ((Number) queueTime.metricValue()).longValue();
+    }
+
+    private void registerMetricSync() {
+        if (closed) {
+            return;
+        }
+        timeService.registerProcessingTimer(
+                lastSync + METRIC_UPDATE_INTERVAL_MILLIS,
+                (time) -> {
+                    MetricUtil.sync(byteOutMetric, numBytesOutCounter);
+                    lastSync = time;
+                    registerMetricSync();
+                });

Review comment:
       > It might be easier to just use `System.currentTimeMillis()` instead of 
`lastSync` to not trigger stuff to often in case of overload? But not sure.
   
   I am not sure what the best way here is. With your suggestion we need to 
call `System.currentTimeMillis()` for every registration. Currently is only 
needed once.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to