vamossagar12 commented on code in PR #16628:
URL: https://github.com/apache/kafka/pull/16628#discussion_r1683245294


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -926,6 +933,119 @@ public void testPollTimeoutExpiry() throws Exception {
         }
     }
 
+    @Test
+    public void testNoDuplicateTaskAssignmentOnWorkerPollTimeoutExpiry() 
throws Exception {
+        String statusTopic = "status-topic";
+        // This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+        // task#stop method which is blocked. The timeouts have been set 
accordingly
+        workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(10)));
+        // This is set to a high value to ensure that all tasks can stop in 
time and also, we don't have the blocked
+        // task meddling with the rest of the test by being started midway.
+        workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(60)));
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(5)));
+        workerProps.put(STATUS_STORAGE_TOPIC_CONFIG, statusTopic);
+        workerProps.put(STATUS_STORAGE_PARTITIONS_CONFIG, Integer.toString(1));
+        connect = connectBuilder
+            .numBrokers(1)
+            .numWorkers(1)
+            .build();
+
+        connect.start();
+        WorkerHandle leader = connect.workers().iterator().next();
+
+        Map<String, String> connectorConfig = 
defaultSourceConnectorProps("topic1");
+        connectorConfig.put(TASKS_MAX_CONFIG, "1");
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+        );
+
+        // The task that has a blocking stop call gets scheduled on this 
worker eventually leading to a poll timeout.
+        WorkerHandle timingOutWorker = connect.addWorker();
+        connect.assertions().assertExactlyNumWorkersAreUp(2, "Workers didn't 
start in time");
+
+        Map<String, String> blockingTaskConnectorConfig = new HashMap<>();
+        blockingTaskConnectorConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getSimpleName());
+        blockingTaskConnectorConfig.put(TASKS_MAX_CONFIG, "1");
+        
blockingTaskConnectorConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, 
Objects.requireNonNull(TASK_STOP));
+        connect.configureConnector(CONNECTOR_NAME + "-1", 
blockingTaskConnectorConfig);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME + "-1", 1, "connector and tasks did not start in 
time"
+        );
+
+        connectorConfig.put(TOPIC_CONFIG, "topic2");
+        connectorConfig.put(TASKS_MAX_CONFIG, "2");
+        connect.configureConnector(CONNECTOR_NAME + "-2", connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME + "-2", 2, "connector and tasks did not start in 
time"
+        );
+        // We verify this task id because it is the one which gets duplicated. 
Banking upon the assignment logic of
+        // ICR here.
+        String taskIdToVerify = new ConnectorTaskId(CONNECTOR_NAME + "-2", 
1).toString();
+
+        // Restarting the task on a separate thread to not block the test 
thread.
+        Thread restartThread = new Thread(() -> {
+            try {
+                connect.restartTask(CONNECTOR_NAME + "-1", 0);
+            } catch (Exception e) {

Review Comment:
   We get an exception here because of the scenario described 
[here](https://github.com/apache/kafka/pull/16628/files#r1683244575)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to