C0urante commented on a change in pull request #10629:
URL: https://github.com/apache/kafka/pull/10629#discussion_r659903390



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
##########
@@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String 
connectorName) {
                 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
     }
 
+    /**
+     * Assert that a connector and its tasks are deleted.
+     *
+     * @param connectorName the connector name
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndTasksAreDeleted(String connectorName, String 
detailMessage)

Review comment:
       Nit: do we really need the `AndTasks` section? Might be fine to use 
`assertConnectorIsDeleted` instead.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
##########
@@ -392,6 +392,48 @@ protected boolean checkConnectorAndTasksAreStopped(String 
connectorName) {
                 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
     }
 
+    /**
+     * Assert that a connector and its tasks are deleted.
+     *
+     * @param connectorName the connector name
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndTasksAreDeleted(String connectorName, String 
detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkConnectorAndTasksAreDeleted(connectorName),
+                CONNECTOR_SETUP_DURATION_MS,
+                "At least the connector or one of its tasks still exists.");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Check whether the connector or any of its tasks still exist.
+     *
+     * @param connectorName the connector
+     * @return true if the connector and all the tasks are not in RUNNING 
state; false otherwise
+     */
+    protected boolean checkConnectorAndTasksAreDeleted(String connectorName) {
+        ConnectorStateInfo info;
+        try {
+            info = connect.connectorStatus(connectorName);
+        } catch (ConnectRestException e) {
+            return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return false;
+        }
+        if (info == null) {
+            return true;
+        }

Review comment:
       Do we know when this might happen? My understanding is that part of the 
contract we're trying to test here is that Connect gives back a 404 response 
when a stalled connector is deleted; do we want to relax our expectations to 
also permit this case?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##########
@@ -596,6 +662,8 @@ public void start(Map<String, String> props) {
             @Override
             public List<SourceRecord> poll() {
                 block.maybeBlockOn(SOURCE_TASK_POLL);
+                // even when not blocking, pause to prevent a tight loop
+                Utils.sleep(1000);

Review comment:
       Good call 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to