xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430590488



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       @xvrl  Do you mean add this logic back to KafkaServer.scala:
   
metadata.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX,
 false));
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       Changed name to kafkaClusterId

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       Updated code to reflect KIP changes

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       renamed jmxPrefix to metricsPrefix and add broker MetricsContext 
properties

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
##########
@@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) {
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be 
specified");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         data = new HashMap<>();
 
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, 
clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, 
clusterId);

Review comment:
       Nice catch, fixed.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + 
groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.metrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = 
(MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", 
mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", 
mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }

Review comment:
       Added verification.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Removed deprecated configs.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
##########
@@ -63,7 +67,7 @@
      * @param config   the worker configuration; may not be null
      * @param time     the time; may not be null
      */
-    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time, 
String clusterId) {

Review comment:
       Updated javadoc.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): 
Unit = {
+    val metricsContext = createKafkaMetricsContext()
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+
+  private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX,
 false))

Review comment:
       Fixed

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics 
context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's contextLabels map.
+     */
+    private final Map<String, String> contextLabels = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param contextLabels  contextLabels additional entries to add to the 
context.
+     *                  values will be converted to string using 
Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) 
{
+        this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, 
value.toString()));
+    }
+
+    public Map<String, String> contextLabels() {

Review comment:
       Added annotation

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed 
via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns contextLabels fields
+     */
+    Map<String, String> contextLabels();

Review comment:
       Updated javadoc.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) 
throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       It can be called multiple times.  Not sure if we should mention that in 
Javadoc, other methods in this class we are not mention if it can be called 
multiple times even though they can be called multiple times. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to