rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r697740825
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -178,12 +184,48 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } + oldProducerTotalBlockedTime += totalBlockedTime(producer); + final long start = time.nanoseconds(); producer.close(); + final long closeTime = time.nanoseconds() - start; + oldProducerTotalBlockedTime += closeTime; producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } + private double getMetricValue(final Map<MetricName, ? extends Metric> metrics, + final String name) { + final List<MetricName> found = metrics.keySet().stream() + .filter(n -> n.name().equals(name)) + .collect(Collectors.toList()); + if (found.isEmpty()) { + return 0.0; + } + if (found.size() > 1) { Review comment: I agree it should never happen. I'm a little hesitant here since if this scenario occurs and we just log the only downside is a bad metric value, but if we throw we may cause a query to go down. I was thinking we can observe this log value after deploying and convert to an exception as long as we never see it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org