gharris1727 commented on code in PR #17804:
URL: https://github.com/apache/kafka/pull/17804#discussion_r1949869360


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -585,19 +590,37 @@ public void raiseError(Exception e) {
     }
 
     private class WorkerSinkConnectorContext extends WorkerConnectorContext 
implements SinkConnectorContext {
+
+        private final PluginMetrics pluginMetrics;
+
+        WorkerSinkConnectorContext(PluginMetrics pluginMetrics) {
+            this.pluginMetrics = pluginMetrics;
+        }
+
+        @Override
+        public PluginMetrics pluginMetrics() {
+            return pluginMetrics;
+        }
     }
 
     private class WorkerSourceConnectorContext extends WorkerConnectorContext 
implements SourceConnectorContext {
 
         private final OffsetStorageReader offsetStorageReader;
+        private final PluginMetrics pluginMetrics;

Review Comment:
    nit: Could these be deduplicated in WorkerConnectorContext?
    
    Related to this, the other methods in WorkerConnectorContext delegate to 
the `WorkerConnector.this.ctx`, which is a HerderConnectorContext. If it had a 
reasonable `pluginMetrics` implementation, we could call it.
    
    This would prevent the WorkerConnector from having to handle and close the 
pluginMetrics AFAIU.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java:
##########
@@ -38,33 +39,36 @@ public class TransformationStageTest {
     private final SourceRecord transformed = new 
SourceRecord(singletonMap("transformed", 2), null, null, null, null);
 
     @Test
-    public void apply() {
+    public void apply() throws Exception {
         applyAndAssert(true, false, transformed);
         applyAndAssert(true, true, initial);
         applyAndAssert(false, false, initial);
         applyAndAssert(false, true, transformed);
     }
 
-    private void applyAndAssert(boolean predicateResult, boolean negate,
-                                SourceRecord expectedResult) {
-
-        @SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
+    private void applyAndAssert(boolean predicateResult, boolean negate, 
SourceRecord expectedResult) throws Exception {
+        Plugin<Predicate<SourceRecord>> predicatePlugin = mock(Plugin.class);
         Predicate<SourceRecord> predicate = mock(Predicate.class);
         when(predicate.test(any())).thenReturn(predicateResult);
-        @SuppressWarnings("unchecked")
+        when(predicatePlugin.get()).thenReturn(predicate);
+        Plugin<Transformation<SourceRecord>> transformationPlugin = 
mock(Plugin.class);
         Transformation<SourceRecord> transformation = 
mock(Transformation.class);
+        if ((predicateResult && !negate) || (!predicateResult && negate)) {
+            when(transformationPlugin.get()).thenReturn(transformation);
+        }
         if (expectedResult == transformed) {
             when(transformation.apply(any())).thenReturn(transformed);
         }

Review Comment:
   nit: Use the same condition as for `apply`, because they both happen 
together.
   ```suggestion
           if (expectedResult == transformed) {
               when(transformationPlugin.get()).thenReturn(transformation);
               when(transformation.apply(any())).thenReturn(transformed);
           }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java:
##########
@@ -63,6 +64,11 @@ public void raiseError(Exception e) {
         herder.onFailure(connectorName, e);
     }
 
+    @Override
+    public PluginMetrics pluginMetrics() {
+        return null;

Review Comment:
   Is `null` a legal return value for this method? Do we expect callers to 
handle nulls? If so we should include that in the javadoc for pluginMetrics.
   
   IMHO I think the return value should be non-nullable.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java:
##########
@@ -16,152 +16,54 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.runtime.SampleSinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-/**
- * A sink connector that is used in Apache Kafka integration tests to verify 
the behavior of the
- * Connect framework, but that can be used in other integration tests as a 
simple connector that
- * consumes and counts records. This class provides methods to find task 
instances
- * which are initiated by the embedded connector, and wait for them to consume 
a desired number of
- * messages.
- */
-public class MonitorableSinkConnector extends SampleSinkConnector {
-
-    private static final Logger log = 
LoggerFactory.getLogger(MonitorableSinkConnector.class);
+public class MonitorableSinkConnector extends TestableSinkConnector {
 
-    // Boolean valued configuration that determines whether 
MonitorableSinkConnector::alterOffsets should return true or false
-    public static final String ALTER_OFFSETS_RESULT = "alter.offsets.result";
-
-    private String connectorName;
-    private Map<String, String> commonConfigs;
-    private ConnectorHandle connectorHandle;
+    public static MetricName metricsName = null;
+    public static final String VALUE = "started";
 
     @Override
     public void start(Map<String, String> props) {
-        connectorHandle = 
RuntimeHandles.get().connectorHandle(props.get("name"));
-        connectorName = props.get("name");
-        commonConfigs = props;
-        log.info("Starting connector {}", props.get("name"));
-        connectorHandle.recordConnectorStart();
+        super.start(props);
+        PluginMetrics pluginMetrics = context.pluginMetrics();
+        metricsName = pluginMetrics.metricName("start", "description", 
Map.of());
+        pluginMetrics.addMetric(metricsName, (Gauge<Object>) (config, now) -> 
VALUE);
     }
 
     @Override
     public Class<? extends Task> taskClass() {
         return MonitorableSinkTask.class;
     }
 
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        List<Map<String, String>> configs = new ArrayList<>();
-        for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> config = new HashMap<>(commonConfigs);
-            config.put("connector.name", connectorName);
-            config.put("task.id", connectorName + "-" + i);
-            configs.add(config);
-        }
-        return configs;
-    }
-
-    @Override
-    public void stop() {
-        log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
-        connectorHandle.recordConnectorStop();
-    }
-
-    @Override
-    public ConfigDef config() {
-        return new ConfigDef();
-    }
-
-    @Override
-    public boolean alterOffsets(Map<String, String> connectorConfig, 
Map<TopicPartition, Long> offsets) {
-        return Boolean.parseBoolean(connectorConfig.get(ALTER_OFFSETS_RESULT));
-    }
-
-    public static class MonitorableSinkTask extends SinkTask {
+    public static class MonitorableSinkTask extends TestableSinkTask {
 
-        private String taskId;
-        TaskHandle taskHandle;
-        Map<TopicPartition, Integer> committedOffsets;
-        Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions;
-
-        public MonitorableSinkTask() {
-            this.committedOffsets = new HashMap<>();
-            this.cachedTopicPartitions = new HashMap<>();
-        }
-
-        @Override
-        public String version() {
-            return "unknown";
-        }
+        public static MetricName metricsName = null;
+        private int count = 0;
 
         @Override
         public void start(Map<String, String> props) {
-            taskId = props.get("task.id");
-            String connectorName = props.get("connector.name");
-            taskHandle = 
RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
-            log.debug("Starting task {}", taskId);
-            taskHandle.recordTaskStart();
-        }
-
-        @Override
-        public void open(Collection<TopicPartition> partitions) {
-            log.debug("Opening partitions {}", partitions);
-            taskHandle.partitionsAssigned(partitions);
-        }
-
-        @Override
-        public void close(Collection<TopicPartition> partitions) {
-            log.debug("Closing partitions {}", partitions);
-            taskHandle.partitionsRevoked(partitions);
-            partitions.forEach(committedOffsets::remove);
+            super.start(props);
+            PluginMetrics pluginMetrics = context.pluginMetrics();
+            metricsName = pluginMetrics.metricName("put", "description", 
Map.of());
+            pluginMetrics.addMetric(metricsName, (Measurable) (config, now) -> 
count);
         }
 
         @Override
         public void put(Collection<SinkRecord> records) {

Review Comment:
   nit: I think this `put` implementation breaks the `committedOffsets` 
handling in TestableSinkConnector.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java:
##########
@@ -63,6 +64,11 @@ public void raiseError(Exception e) {
         herder.onFailure(connectorName, e);
     }
 
+    @Override
+    public PluginMetrics pluginMetrics() {
+        return null;
+    }
+

Review Comment:
   I think it makes sense as part of ConnectorContext.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java:
##########
@@ -375,6 +395,44 @@ public void testDefaultCustomizedHttpResponseHeaders() 
throws IOException  {
         checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
     }
 
+    static final class MonitorableConnectRestExtension extends 
PluginsTest.TestConnectRestExtension implements Monitorable {
+
+        private boolean called = false;
+        private static MetricName metricName;
+
+        @Override
+        public void register(ConnectRestExtensionContext restPluginContext) {
+            called = true;
+        }
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            metricName = metrics.metricName("name", "description", Map.of());
+            metrics.addMetric(metricName, (Gauge<Boolean>) (config, now) -> 
called);
+        }
+    }
+
+    @Test
+    public void testMonitorableConnectRestExtension() {
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.REST_EXTENSION_CLASSES_CONFIG, 
MonitorableConnectRestExtension.class.getName());
+
+        doReturn(plugins).when(herder).plugins();
+        doReturn(Collections.singletonList(new 
MonitorableConnectRestExtension())).when(plugins).newPlugins(any(), any(), 
any());
+        server = new ConnectRestServer(null, restClient, configMap);
+
+        // the call throws because of mocks but the ConnectRestExtension 
should have been initialized
+        assertThrows(ConnectException.class, () -> 
server.initializeResources(herder));

Review Comment:
   nit: Are some mocks missing? This looks like the test is asserting it 
contains a bug.
   I'm suprised that strict stubs doesn't also throw after the test completes 
to prevent discarding errors like this.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -381,6 +381,12 @@ default void validateConnectorConfig(Map<String, String> 
connectorConfig, Callba
      */
     void setClusterLoggerLevel(String namespace, String level);
 
+    /**
+     * Get the worker for this herder
+     * @return the worker
+     */
+    Worker worker();

Review Comment:
   Correct me if I missed something, but this method is only ever called to:
   
   * immediately call `.metrics().metrics()`
   * Mock out a herder with only `.metrics()` returning MockConnectMetrics
   
   Could this signature be changed to return either `ConnectMetrics` or 
`Metrics` to avoid exposing the whole Worker surface via the Herder interface? 
Somewhat similar to `Plugins plugins()` earlier in the interface.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java:
##########
@@ -137,6 +138,10 @@ private String[] connectorConfigFiles() {
         return result;
     }
 
+    public StandaloneHerder herder() {

Review Comment:
   Similar to my feedback on the `Herder` change: I think this return value has 
too much surface area, and could permit users of EmbeddedConnectStandalone to 
circumvent the REST API and reach down into the herder/worker, breaking some of 
the abstractions of an integration test.
   
   I do think it's valuable to have integration testing for metrics, and now it 
becomes pretty obvious that it's completely missing right now. I think in this 
PR we should reduce this signature to `ConnectMetrics` or `Metrics`, and follow 
up to make the test full end-to-end with a MetricsReporter implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to