C0urante commented on a change in pull request #10629: URL: https://github.com/apache/kafka/pull/10629#discussion_r659903390
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java ########## @@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String connectorName) { && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); } + /** + * Assert that a connector and its tasks are deleted. + * + * @param connectorName the connector name + * @param detailMessage the assertion message + * @throws InterruptedException + */ + public void assertConnectorAndTasksAreDeleted(String connectorName, String detailMessage) Review comment: Nit: do we really need the `AndTasks` section? Might be fine to use `assertConnectorIsDeleted` instead. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java ########## @@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String connectorName) { && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); } + /** + * Assert that a connector and its tasks are deleted. + * + * @param connectorName the connector name + * @param detailMessage the assertion message + * @throws InterruptedException + */ + public void assertConnectorAndTasksAreDeleted(String connectorName, String detailMessage) + throws InterruptedException { + try { + waitForCondition( + () -> checkConnectorAndTasksAreDeleted(connectorName), + CONNECTOR_SETUP_DURATION_MS, + "At least the connector or one of its tasks still exists."); + } catch (AssertionError e) { + throw new AssertionError(detailMessage, e); + } + } + + /** + * Check whether the connector or any of its tasks still exist. + * + * @param connectorName the connector + * @return true if the connector and all the tasks are not in RUNNING state; false otherwise + */ + protected boolean checkConnectorAndTasksAreDeleted(String connectorName) { + ConnectorStateInfo info; + try { + info = connect.connectorStatus(connectorName); + } catch (ConnectRestException e) { + return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode(); + } catch (Exception e) { + log.error("Could not check connector state info.", e); + return false; + } + if (info == null) { + return true; + } Review comment: Do we know when this might happen? My understanding is that part of the contract we're trying to test here is that Connect gives back a 404 response when a stalled connector is deleted; do we want to relax our expectations to also permit this case? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java ########## @@ -596,6 +662,8 @@ public void start(Map<String, String> props) { @Override public List<SourceRecord> poll() { block.maybeBlockOn(SOURCE_TASK_POLL); + // even when not blocking, pause to prevent a tight loop + Utils.sleep(1000); Review comment: Good call 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org