C0urante commented on a change in pull request #10016: URL: https://github.com/apache/kafka/pull/10016#discussion_r568612017
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java ########## @@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception { decreasedNumTasks, "Connector task statuses did not update in time."); } + @Test + public void testSourceTaskOnNonExistentTopic() throws Exception { + connect = connectBuilder + .numWorkers(1) + .numBrokers(1) + .build(); + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time."); + + Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic"); + props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG); + props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG); + props.put("throughput", "-1"); + + ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); + connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL); + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, + NUM_TASKS, "Connector tasks did not start in time"); + connector.awaitRecords(TimeUnit.MINUTES.toMillis(1)); + + StartAndStopLatch stopCounter = connector.expectedStops(1); Review comment: It's asserting that shutdown is not blocked indefinitely when we hit the topic-creation-disabled scenario, but not that the task fails. I'll try to make things clearer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org