cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r697393359



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
         return newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);
     }
 
+    private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError() {
+        return consumerWithPendingAuthenticationError(new MockTime());
+    }
+
+    private KafkaConsumer<String, String> consumerWithPendingError(final Time 
time) {
+        return consumerWithPendingAuthenticationError(time);
+    }

Review comment:
       Is this method needed?  Couldn't you directly call 
`consumerWithPendingAuthenticationError(final Time time)`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KafkaConsumerMetricsTest {
+  private static final long METRIC_VALUE = 123L;
+  private static final String CONSUMER_GROUP_PREFIX = "consumer";
+  private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
+
+  private final Metrics metrics = new Metrics();
+  private final KafkaConsumerMetrics consumerMetrics
+      = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+
+  @Test
+  public void shouldRecordCommitSyncTime() {
+    // When:
+    consumerMetrics.recordCommitSync(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("commit-sync-time-total");
+  }
+
+  @Test
+  public void shouldRecordCommittedTime() {
+    // When:
+    consumerMetrics.recordCommitted(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("committed-time-total");
+  }
+

Review comment:
       Thanks you for adding the tests!
   Could you also add a test for the `close()` method to be sure all metrics 
are removed from `metrics` on close.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KafkaProducerMetricsTest {
+  private static final long METRIC_VALUE = 123L;
+
+  private final Metrics metrics = new Metrics();
+  private final KafkaProducerMetrics producerMetrics = new 
KafkaProducerMetrics(metrics);
+
+  @Test
+  public void shouldRecordFlushTime() {
+    // When:
+    producerMetrics.recordFlush(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("flush-time-total");
+  }
+
+  @Test
+  public void shouldRecordInitTime() {
+    // When:
+    producerMetrics.recordInit(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-init-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxBeginTime() {
+    // When:
+    producerMetrics.recordBeginTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-begin-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxCommitTime() {
+    // When:
+    producerMetrics.recordCommitTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-commit-time-total");
+  }
+
+  @Test
+  public void shouldRecordTxAbortTime() {
+    // When:
+    producerMetrics.recordAbortTxn(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-abort-time-total");
+  }
+
+  @Test
+  public void shouldRecordSendOffsetsTime() {
+    // When:
+    producerMetrics.recordSendOffsets(METRIC_VALUE);
+
+    // Then:
+    assertMetricValue("txn-send-offsets-time-total");
+  }
+
+  private void assertMetricValue(final String name) {
+    assertEquals(
+        metrics.metric(metrics.metricName(name, 
KafkaProducerMetrics.GROUP)).metricValue(),
+        (double) METRIC_VALUE
+    );
+  }
+}

Review comment:
       Could you please also add a test for `close()` to ensure that metrics 
are removed from `metrics` on close?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -99,6 +99,9 @@ void maybeCreateTasksFromNewTopologies() {
             
activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies),
             
standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies)
         );
+

Review comment:
       Here a `}` is missing which leads to the compilation errors in the 
builds.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to