C0urante commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1146250479
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -325,6 +325,181 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); } + /** + * Verify that the target state (started, paused, stopped) of a connector can be updated, with + * an emphasis on ensuring that the transitions between each state are correct. + * <p> + * The transitions we need to cover are: + * <ol> + * <li>RUNNING -> PAUSED</li> + * <li>RUNNING -> STOPPED</li> + * <li>PAUSED -> RUNNING</li> + * <li>PAUSED -> STOPPED</li> + * <li>STOPPED -> RUNNING</li> + * <li>STOPPED -> PAUSED</li> + * </ol> + * With some reordering, we can perform each transition just once: + * <ul> + * <li>Start with RUNNING</li> + * <li>Transition to STOPPED (2)</li> + * <li>Transition to RUNNING (5)</li> + * <li>Transition to PAUSED (1)</li> + * <li>Transition to STOPPED (4)</li> + * <li>Transition to PAUSED (6)</li> + * <li>Transition to RUNNING (3)</li> + * </ul> + */ + @Test + public void testPauseStopResume() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + // Want to make sure to use multiple tasks + final int numTasks = 4; + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + + // Start with RUNNING + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not start in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + numTasks, + "Connector did not pause in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + 0, + "Connector did not pause in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Delete the connector + connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreNotRunning( + CONNECTOR_NAME, + "Connector tasks were not destroyed in time" + ); + } + + /** + * Test out the {@code STOPPED} state introduced in + * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED">KIP-875</a>, + * with an emphasis on correctly handling errors thrown from the connector. + */ + @Test + public void testStoppedState() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + // Fail the connector on startup + props.put("connector.start.inject.error", "true"); + + // Start the connector (should fail immediately and generate no tasks) + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorIsFailedAndTasksHaveFailed( + CONNECTOR_NAME, + 0, + "Connector should have failed and not generated any tasks" + ); + + // Stopping a failed connector updates its state to STOPPED in the REST API + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Can resume a connector after its Connector has failed before shutdown after receiving a stop request + props.remove("connector.start.inject.error"); + connect.configureConnector(CONNECTOR_NAME, props); + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector or tasks did not start running healthily in time" + ); + + // Fail the connector on shutdown + props.put("connector.stop.inject.error", "true"); + // Stopping a connector that fails during shutdown after receiving a stop request updates its state to STOPPED in the REST API + connect.configureConnector(CONNECTOR_NAME, props); + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Can resume a connector after is Connector has failed during shutdown after receiving a stop request Review Comment: Yep, exactly 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org