vamossagar12 commented on code in PR #16628: URL: https://github.com/apache/kafka/pull/16628#discussion_r1683245294
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -926,6 +933,119 @@ public void testPollTimeoutExpiry() throws Exception { } } + @Test + public void testNoDuplicateTaskAssignmentOnWorkerPollTimeoutExpiry() throws Exception { + String statusTopic = "status-topic"; + // This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on + // task#stop method which is blocked. The timeouts have been set accordingly + workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(10))); + // This is set to a high value to ensure that all tasks can stop in time and also, we don't have the blocked + // task meddling with the rest of the test by being started midway. + workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(60))); + workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(5))); + workerProps.put(STATUS_STORAGE_TOPIC_CONFIG, statusTopic); + workerProps.put(STATUS_STORAGE_PARTITIONS_CONFIG, Integer.toString(1)); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + + connect.start(); + WorkerHandle leader = connect.workers().iterator().next(); + + Map<String, String> connectorConfig = defaultSourceConnectorProps("topic1"); + connectorConfig.put(TASKS_MAX_CONFIG, "1"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, 1, "connector and tasks did not start in time" + ); + + // The task that has a blocking stop call gets scheduled on this worker eventually leading to a poll timeout. + WorkerHandle timingOutWorker = connect.addWorker(); + connect.assertions().assertExactlyNumWorkersAreUp(2, "Workers didn't start in time"); + + Map<String, String> blockingTaskConnectorConfig = new HashMap<>(); + blockingTaskConnectorConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getSimpleName()); + blockingTaskConnectorConfig.put(TASKS_MAX_CONFIG, "1"); + blockingTaskConnectorConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + connect.configureConnector(CONNECTOR_NAME + "-1", blockingTaskConnectorConfig); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME + "-1", 1, "connector and tasks did not start in time" + ); + + connectorConfig.put(TOPIC_CONFIG, "topic2"); + connectorConfig.put(TASKS_MAX_CONFIG, "2"); + connect.configureConnector(CONNECTOR_NAME + "-2", connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME + "-2", 2, "connector and tasks did not start in time" + ); + // We verify this task id because it is the one which gets duplicated. Banking upon the assignment logic of + // ICR here. + String taskIdToVerify = new ConnectorTaskId(CONNECTOR_NAME + "-2", 1).toString(); + + // Restarting the task on a separate thread to not block the test thread. + Thread restartThread = new Thread(() -> { + try { + connect.restartTask(CONNECTOR_NAME + "-1", 0); + } catch (Exception e) { Review Comment: We get an exception here because of the scenario described [here](https://github.com/apache/kafka/pull/16628/files#r1683244575) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org