C0urante commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1419175286


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -771,6 +778,104 @@ private Map<String, String> 
defaultSinkConnectorProps(String topics) {
         return props;
     }
 
+    @Test
+    public void testRequestTimeouts() throws Exception {
+        final String configTopic = "test-request-timeout-configs";
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        // Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+        // be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+        // Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+        // connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+        // is set to 0
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+        connect = connectBuilder
+                .numBrokers(1)
+                .numWorkers(1)
+                .build();
+        connect.start();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1,
+                "Worker did not start in time");
+
+        Map<String, String> connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+        Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
+        connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+        // Create a connector to ensure that the worker has completed startup
+        log.info("Creating initial connector");
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+        );
+
+        // Bring down Kafka, which should cause some REST requests to fail
+        log.info("Stopping Kafka cluster");
+        connect.kafka().stopOnlyKafka();
+        // Allow for the workers to discover that the coordinator is 
unavailable, wait is
+        // heartbeat timeout * 2 + 4sec
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.requestTimeout(5_000);
+        // Try to reconfigure the connector, which should fail with a timeout 
error
+        log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+        ConnectRestException e = assertThrows(
+                ConnectRestException.class,
+                () -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+        );
+        assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+        assertNotNull(e.getMessage());
+        assertTrue(
+                "Message '" + e.getMessage() + "' does not match expected 
format",
+                e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Ah, good idea! Added assert-with-retry logic to the `assertTimeoutException` 
utility method.
   
   I've also tweaked how we track stages when the herder is polling the group 
coordinator. Previously, when the `WorkerGroupMember` and `WorkerCoordinator` 
methods only accepted a `Runnable`, those stages would never be closed. Now, 
those methods accept a `Supplier<UncheckedCloseable>` that, at the moment, 
always returns a `Distributed.TickThreadStage`, which can be used with a 
try-with-resources block to automatically register and complete tick thread 
stages.
   
   I know that I've used `UncheckedCloseable` incorrectly in the past; I 
believe this time doesn't suffer from any of the issues my previous usages did. 
But if the result is still undesirable, we can explore alternatives like 
introducing a new interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to