C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568599861



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       I wanted to keep things focused here. We know that there's an edge case 
with the producer that can cause it to hang forever in `send`, and we're 
addressing that. As far as I know, there is no such case with the admin client, 
so closing the admin client proactively seems unnecessary. Although there could 
be an issue with a transformation chain blocking, there's no guarantee that 
invoking `close` on it is going to fix anything since a transform blocked on, 
e.g., input/output could remain blocked on that even after being `closed` from 
another thread.
   
   Ultimately, if a similar case comes up where follow-up is necessary, we can 
consider our options then based on the particulars of the situation. Right now 
we only have one specific problem to solve, and I think a targeted approach 
that doesn't unnecessarily change things is best for that.
   
   If nothing else, maybe a comment explaining why the producer gets special 
treatment here would be beneficial.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder 
thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       Yeah, we can probably use an executor for this. I was on the fence about 
the necessity of doing this in a separate thread at all (hence the half-assing 
here) but then I looked deeper into the `KafkaProducer::close` logic and saw 
that that included closing its interceptors, key serializer, value serializer, 
etc. Any time user code is called we should assume it can block forever 
(probably worth adding as a comment here), so yeah, since I'm more confident in 
the necessity of asynchronously closing the producer I'll expand this to use an 
executor.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in 
time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of 
workers did not start in time.");
+
+        Map<String, String> props = 
defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + 
REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       It's asserting that shutdown is not blocked indefinitely when we hit the 
topic-creation-disabled scenario, but not that the task fails. I'll try to make 
things clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to