LiamClarkeNZ commented on a change in pull request #11817:
URL: https://github.com/apache/kafka/pull/11817#discussion_r820325335
##########
File path:
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -213,69 +234,73 @@ public void setup() {
// Some common defaults. They might change on individual tests
connectorProps = anyConnectorConfigMap();
- PowerMock.mockStatic(Plugins.class);
+
+ pluginsMockedStatic = mockStatic(Plugins.class);
+
+ // pass through things that aren't explicitly mocked out
+ connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new
CallsRealMethods());
+ connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+
+ // Make calls to new WorkerSourceTask() return a mock to avoid the
source task trying to connect to a broker.
+ sourceTaskMockedConstruction =
mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
+
+ // provide implementations of three methods used during testing
+ switch (invocation.getMethod().getName()) {
+ case "id":
+ return TASK_ID;
+ case "loader":
+ return pluginLoader;
+ case "awaitStop":
+ return true;
+ default:
+ return null;
+ }
+ });
+ }
+
+ @After
+ public void teardown() {
+ // Critical to always close MockedStatics
+ // Ideal would be to use try-with-resources in an individual test, but
it introduced a rather large level of
+ // indentation of most test bodies, hence sticking with setup() /
teardown()
+ pluginsMockedStatic.close();
+ connectUtilsMockedStatic.close();
+ sourceTaskMockedConstruction.close();
+
+ mockitoSession.finishMocking();
}
@Test
public void testStartAndStopConnector() throws Throwable {
- expectConverters();
- expectStartStorage();
final String connectorClass = WorkerTestConnector.class.getName();
-
- // Create
-
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
-
EasyMock.expect(delegatingLoader.connectorLoader(connectorClass)).andReturn(pluginLoader);
- EasyMock.expect(plugins.newConnector(connectorClass))
- .andReturn(sourceConnector);
- EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
connectorClass);
- EasyMock.expect(sourceConnector.version()).andReturn("1.0");
-
- expectFileConfigProvider();
- EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader))
- .andReturn(delegatingLoader)
- .times(3);
- sourceConnector.initialize(anyObject(ConnectorContext.class));
- EasyMock.expectLastCall();
- sourceConnector.start(connectorProps);
- EasyMock.expectLastCall();
-
- EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
- .andReturn(pluginLoader).times(3);
-
- connectorStatusListener.onStartup(CONNECTOR_ID);
- EasyMock.expectLastCall();
-
- // Remove
- sourceConnector.stop();
- EasyMock.expectLastCall();
-
- connectorStatusListener.onShutdown(CONNECTOR_ID);
- EasyMock.expectLastCall();
-
- ctx.close();
- expectLastCall();
-
- expectStopStorage();
- expectClusterId();
+ // Create
+ when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
+ when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
+
when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader);
+ when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector);
+ when(sourceConnector.version()).thenReturn("1.0");
- PowerMock.replayAll();
+ pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
+ pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+ connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+ .thenReturn(CLUSTER_ID);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
- worker.herder = herder;
worker.start();
assertEquals(Collections.emptySet(), worker.connectorNames());
FutureCallback<TargetState> onFirstStart = new FutureCallback<>();
+
worker.startConnector(CONNECTOR_ID, connectorProps, ctx,
connectorStatusListener, TargetState.STARTED, onFirstStart);
+
// Wait for the connector to actually start
assertEquals(TargetState.STARTED, onFirstStart.get(1000,
TimeUnit.MILLISECONDS));
- assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)),
worker.connectorNames());
+ assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR_ID)),
worker.connectorNames());
Review comment:
Cheers :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]