rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697740825



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +184,48 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> 
metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {

Review comment:
       I agree it should never happen. I'm a little hesitant here since if this 
scenario occurs and we just log the only downside is a bad metric value, but if 
we throw we may cause a query to go down. I was thinking we can observe this 
log value after deploying and convert to an exception as long as we never see 
it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to