cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r698327147
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ########## @@ -178,12 +184,48 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } + oldProducerTotalBlockedTime += totalBlockedTime(producer); + final long start = time.nanoseconds(); producer.close(); + final long closeTime = time.nanoseconds() - start; + oldProducerTotalBlockedTime += closeTime; producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } + private double getMetricValue(final Map<MetricName, ? extends Metric> metrics, + final String name) { + final List<MetricName> found = metrics.keySet().stream() + .filter(n -> n.name().equals(name)) + .collect(Collectors.toList()); + if (found.isEmpty()) { + return 0.0; + } + if (found.size() > 1) { Review comment: @rodesai I see your point here. However, the downside of not throwing is that we will also not notice the bad behavior in our tests like the soak tests. I personally prefer to improve tests instead of downgrading the reaction to bad behavior. Assume in future somebody makes a change that breaks the assumption of the non-shared metrics registry, we would find this bug immediately during development instead of during production. Another option that comes to my mind is to classify exceptions that originate from the metrics framework differently in the uncaught exception handler, but that would probably need some more work. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() { return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); } + private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() { + return consumerWithPendingAuthenticationError(new MockTime()); + } + + private KafkaConsumer<String, String> consumerWithPendingError(final Time time) { + return consumerWithPendingAuthenticationError(time); + } Review comment: Fair enough! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org