LiamClarkeNZ commented on a change in pull request #11817:
URL: https://github.com/apache/kafka/pull/11817#discussion_r820325335



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -213,69 +234,73 @@ public void setup() {
 
         // Some common defaults. They might change on individual tests
         connectorProps = anyConnectorConfigMap();
-        PowerMock.mockStatic(Plugins.class);
+
+        pluginsMockedStatic = mockStatic(Plugins.class);
+
+        // pass through things that aren't explicitly mocked out
+        connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new 
CallsRealMethods());
+        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+
+        // Make calls to new WorkerSourceTask() return a mock to avoid the 
source task trying to connect to a broker.
+        sourceTaskMockedConstruction = 
mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
+
+            // provide implementations of three methods used during testing
+            switch (invocation.getMethod().getName()) {
+                case "id":
+                    return TASK_ID;
+                case "loader":
+                    return pluginLoader;
+                case "awaitStop":
+                    return true;
+                default:
+                    return null;
+            }
+        });
+    }
+
+    @After
+    public void teardown() {
+        // Critical to always close MockedStatics
+        // Ideal would be to use try-with-resources in an individual test, but 
it introduced a rather large level of
+        // indentation of most test bodies, hence sticking with setup() / 
teardown()
+        pluginsMockedStatic.close();
+        connectUtilsMockedStatic.close();
+        sourceTaskMockedConstruction.close();
+
+        mockitoSession.finishMocking();
     }
 
     @Test
     public void testStartAndStopConnector() throws Throwable {
-        expectConverters();
-        expectStartStorage();
 
         final String connectorClass = WorkerTestConnector.class.getName();
-
-        // Create
-        
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-        
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
-        
EasyMock.expect(delegatingLoader.connectorLoader(connectorClass)).andReturn(pluginLoader);
-        EasyMock.expect(plugins.newConnector(connectorClass))
-                .andReturn(sourceConnector);
-        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connectorClass);
 
-        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
-        expectFileConfigProvider();
-        EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader))
-                .andReturn(delegatingLoader)
-                .times(3);
-        sourceConnector.initialize(anyObject(ConnectorContext.class));
-        EasyMock.expectLastCall();
-        sourceConnector.start(connectorProps);
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
-                .andReturn(pluginLoader).times(3);
-
-        connectorStatusListener.onStartup(CONNECTOR_ID);
-        EasyMock.expectLastCall();
-
-        // Remove
-        sourceConnector.stop();
-        EasyMock.expectLastCall();
-
-        connectorStatusListener.onShutdown(CONNECTOR_ID);
-        EasyMock.expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        expectStopStorage();
-        expectClusterId();
+        // Create
+        when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+        when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+        
when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader);
+        when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector);
+        when(sourceConnector.version()).thenReturn("1.0");
 
-        PowerMock.replayAll();
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+        pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+                                .thenReturn(CLUSTER_ID);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
-        worker.herder = herder;
         worker.start();
 
         assertEquals(Collections.emptySet(), worker.connectorNames());
 
         FutureCallback<TargetState> onFirstStart = new FutureCallback<>();
+
         worker.startConnector(CONNECTOR_ID, connectorProps, ctx, 
connectorStatusListener, TargetState.STARTED, onFirstStart);
+
         // Wait for the connector to actually start
         assertEquals(TargetState.STARTED, onFirstStart.get(1000, 
TimeUnit.MILLISECONDS));
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
+        assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR_ID)), 
worker.connectorNames());

Review comment:
       Cheers :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to