xiaodongdu commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r430590488
########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private var shutdownLatch = new CountDownLatch(1) + //properties for MetricsContext Review comment: @xvrl Do you mean add this logic back to KafkaServer.scala: metadata.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false)); ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -92,6 +93,7 @@ private final ExecutorService executor; private final Time time; private final String workerId; + private final String clusterId; Review comment: Changed name to kafkaClusterId ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional metadata about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The metadata map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { + /* predefined fields */ + String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix + + /** + * Returns metadata fields + */ + Map<String, String> metadata(); Review comment: Updated code to reflect KIP changes ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private var shutdownLatch = new CountDownLatch(1) + //properties for MetricsContext Review comment: renamed jmxPrefix to metricsPrefix and add broker MetricsContext properties ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java ########## @@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) { if (topic == null || topic.trim().length() == 0) throw new ConfigException("Offset storage topic must be specified"); + String clusterId = ConnectUtils.lookupKafkaClusterId(config); data = new HashMap<>(); Map<String, Object> originals = config.originals(); Map<String, Object> producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map<String, Object> consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); Review comment: Nice catch, fixed. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ConnectUtils.class}) +@PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) +public class WorkerGroupMemberTest { + @Mock + private ConfigBackingStore configBackingStore; + @Mock + private StatusBackingStore statusBackingStore; + + @Test + public void testMetrics() throws Exception { + WorkerGroupMember member; + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + workerProps.put("group.id", "group-1"); + workerProps.put("offset.storage.topic", "topic-1"); + workerProps.put("config.storage.topic", "topic-1"); + workerProps.put("status.storage.topic", "topic-1"); + workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName()); + DistributedConfig config = new DistributedConfig(workerProps); + + + LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]"); + + expectClusterId(); + + member = new WorkerGroupMember(config, "", configBackingStore, + null, Time.SYSTEM, "client-1", logContext); + + for (MetricsReporter reporter : member.metrics().reporters()) { + if (reporter instanceof MockConnectMetrics.MockMetricsReporter) { + MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter; + assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID)); + assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID)); + } + } Review comment: Added verification. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ConnectUtils.class}) +@PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) +public class WorkerGroupMemberTest { + @Mock + private ConfigBackingStore configBackingStore; + @Mock + private StatusBackingStore statusBackingStore; + + @Test + public void testMetrics() throws Exception { + WorkerGroupMember member; + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); Review comment: Removed deprecated configs. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java ########## @@ -63,7 +67,7 @@ * @param config the worker configuration; may not be null * @param time the time; may not be null */ - public ConnectMetrics(String workerId, WorkerConfig config, Time time) { + public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) { Review comment: Updated javadoc. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP clusterResourceListeners.onUpdate(new ClusterResource(clusterId)) } + private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = { + val metricsContext = createKafkaMetricsContext() + metricsReporters.foreach { + case x: MetricsReporter => x.contextChange(metricsContext) + case _ => //do nothing + } + } + + private[server] def createKafkaMetricsContext() : KafkaMetricsContext = { + val contextLabels = new util.HashMap[String, Object] + contextLabels.put(KAFKA_CLUSTER_ID, clusterId) + contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString) + contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false)) Review comment: Fixed ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients + */ +public class KafkaMetricsContext implements MetricsContext { + /** + * Client or Service's contextLabels map. + */ + private final Map<String, String> contextLabels = new HashMap<>(); + + /** + * Create a MetricsContext with namespace, no service or client properties + * @param namespace value for _namespace key + */ + public KafkaMetricsContext(String namespace) { + this(namespace, new HashMap<>()); + } + + /** + * Create a MetricsContext with namespace, service or client properties + * @param namespace value for _namespace key + * @param contextLabels contextLabels additional entries to add to the context. + * values will be converted to string using Object.toString() + */ + public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) { + this.contextLabels.put(MetricsContext.NAMESPACE, namespace); + contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString())); + } + + public Map<String, String> contextLabels() { Review comment: Added annotation ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * MetricsContext encapsulates additional contextLabels about metrics exposed via a + * {@link org.apache.kafka.common.metrics.MetricsReporter} + * + * The contextLabels map provides following information: + * - a <code>_namespace</node> field indicating the component exposing metrics + * e.g. kafka.server, kafka.consumer + * {@link JmxReporter} uses this as prefix for mbean names + * + * - for clients and streams libraries: any freeform fields passed in via + * client properties in the form of `metrics.context.<key>=<value> + * + * - for kafka brokers: kafka.broker.id, kafka.cluster.id + * - for connect workers: connect.kafka.cluster.id, connect.group.id + */ +@InterfaceStability.Evolving +public interface MetricsContext { + /* predefined fields */ + String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix + + /** + * Returns contextLabels fields + */ + Map<String, String> contextLabels(); Review comment: Updated javadoc. ########## File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ########## @@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept default void reconfigure(Map<String, ?> configs) { } + /** + * Provides context labels for the service or library exposing metrics + * + * @param metricsContext the metric context + */ + @InterfaceStability.Evolving + default void contextChange(MetricsContext metricsContext) { Review comment: It can be called multiple times. Not sure if we should mention that in Javadoc, other methods in this class we are not mention if it can be called multiple times even though they can be called multiple times. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org