cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698327147



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -178,12 +184,48 @@ public void resetProducer() {
             throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
         }
 
+        oldProducerTotalBlockedTime += totalBlockedTime(producer);
+        final long start = time.nanoseconds();
         producer.close();
+        final long closeTime = time.nanoseconds() - start;
+        oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
         transactionInitialized = false;
     }
 
+    private double getMetricValue(final Map<MetricName, ? extends Metric> 
metrics,
+                                  final String name) {
+        final List<MetricName> found = metrics.keySet().stream()
+            .filter(n -> n.name().equals(name))
+            .collect(Collectors.toList());
+        if (found.isEmpty()) {
+            return 0.0;
+        }
+        if (found.size() > 1) {

Review comment:
       @rodesai I see your point here. However, the downside of not throwing is 
that we will also not notice the bad behavior in our tests like the soak tests. 
I personally prefer to improve tests instead of downgrading the reaction to bad 
behavior. Assume in future somebody makes a change that breaks the assumption 
of the non-shared metrics registry, we would find this bug immediately during 
development instead of during production.
   Another option that comes to my mind is to classify exceptions that 
originate from the metrics framework differently in the uncaught exception 
handler, but that would probably need some more work. 

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
         return newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time 
time) {
+        return consumerWithPendingAuthenticationError(time);
+    }

Review comment:
       Fair enough!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to