rhauch commented on a change in pull request #9669:
URL: https://github.com/apache/kafka/pull/9669#discussion_r534419655



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -206,16 +203,13 @@ public void cancel() {
     public void stop() {
         super.stop();
         stopRequestedLatch.countDown();
-        synchronized (this) {
-            if (finishedStart)
-                tryStop();
-            else
-                startedShutdownBeforeStartCompleted = true;
-        }
     }
 
-    private synchronized void tryStop() {
-        if (!stopped) {
+    // Note: This method is not thread-safe

Review comment:
       Maybe a nit, but this comment seems a bit misleading. It sounds as if 
the method need be called from a thread safe point, but otherwise there are no 
restrictions on how it's used.
   
   Strictly speaking, this method need not be thread safe because it should 
*only* be called from within `close()`, which is called from the task's own 
thread and therefore is thread safe. Perhaps this design assumption should be 
mentioned here.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -228,17 +222,16 @@ private synchronized void tryStop() {
     @Override
     public void execute() {
         try {
+            // If we try to start the task at all (by invoking initialize and 
possibly start), we count this as
+            // "started" in order to properly clean up any resources allocated 
by those invocations when the task is
+            // shut down by calling stop. If the task throws an exception 
during startup, it should still be able to
+            // clean up any allocated resources when stop is called, and if it 
isn't able to or even throws another
+            // exception during stop, the worst thing that happens is another 
exception gets logged for an already-
+            // failed task

Review comment:
       Minor suggestions to improve wording and to be a bit more explicit about 
expectations for Task implementations:
   ```suggestion
               // If we try to start the task at all by invoking initialize, 
then count this as
               // "started" and expect a subsequent call to the task's stop() 
method
               // to properly clean up any resources allocated by its 
initialize() or 
               // start() methods. If the task throws an exception during 
stop(),
               // the worst thing that happens is another exception gets logged 
for an already-
               // failed task
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##########
@@ -263,145 +332,153 @@ private void verifyNormalConnector() throws 
InterruptedException {
         normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
     }
 
-    public static class BlockingConnector extends SourceConnector {
-
+    private static class Block {
         private static CountDownLatch blockLatch;
 
-        private String block;
+        private final String block;
 
         public static final String BLOCK_CONFIG = "block";
 
-        public static final String INITIALIZE = "initialize";
-        public static final String INITIALIZE_WITH_TASK_CONFIGS = 
"initializeWithTaskConfigs";
-        public static final String START = "start";
-        public static final String RECONFIGURE = "reconfigure";
-        public static final String TASK_CLASS = "taskClass";
-        public static final String TASK_CONFIGS = "taskConfigs";
-        public static final String STOP = "stop";
-        public static final String VALIDATE = "validate";
-        public static final String CONFIG = "config";
-        public static final String VERSION = "version";
-
-        private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(
-                BLOCK_CONFIG,
-                ConfigDef.Type.STRING,
-                "",
-                ConfigDef.Importance.MEDIUM,
-                "Where to block indefinitely, e.g., 'start', 'initialize', 
'taskConfigs', 'version'"
-            );
-
-        // No-args constructor required by the framework
-        public BlockingConnector() {
-            this(null);
-        }
-
-        protected BlockingConnector(String block) {
-            this.block = block;
-            synchronized (BlockingConnector.class) {
-                if (blockLatch != null) {
-                    blockLatch.countDown();
-                }
-                blockLatch = new CountDownLatch(1);
-            }
+        private static ConfigDef config() {
+            return new ConfigDef()
+                .define(
+                    BLOCK_CONFIG,
+                    ConfigDef.Type.STRING,
+                    "",
+                    ConfigDef.Importance.MEDIUM,
+                    "Where to block indefinitely, e.g., 'start', 'initialize', 
'taskConfigs', 'version'"
+                );
         }
 
         public static void waitForBlock() throws InterruptedException {
-            synchronized (BlockingConnector.class) {
+            synchronized (Block.class) {
                 if (blockLatch == null) {
                     throw new IllegalArgumentException("No connector has been 
created yet");
                 }
             }
-            
+
             log.debug("Waiting for connector to block");
             blockLatch.await();
             log.debug("Connector should now be blocked");
         }
 
         public static void resetBlockLatch() {
-            synchronized (BlockingConnector.class) {
+            synchronized (Block.class) {
                 if (blockLatch != null) {
                     blockLatch.countDown();
                     blockLatch = null;
                 }
             }
         }
 
+        public Block(Map<String, String> props) {
+            this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG));
+        }
+
+        public Block(String block) {
+            this.block = block;
+            synchronized (Block.class) {
+                if (blockLatch != null) {
+                    blockLatch.countDown();
+                }
+                blockLatch = new CountDownLatch(1);

Review comment:
       How does this static field initialization work within a blocking 
connector that also has a blocking task instance? Aren't both constructors 
called, resulting in the task constructor re-initializing the static 
`blockLatch` instance? Does it work because the tests don't block both a 
connector method and a task method? If so, doesn't that make it somewhat 
brittle if people want to add more tests but don't infer that limitation?
   
   Would it be better to store a static list of all Block latches, have the 
constructor simply create a block latch and add it to that static list, and 
then have the `resetBlockLatch()` method remove each of the Block latches and 
call `countDown()` on them? (This still doesn't bode well for running the tests 
in parallel, but ATM that's not an issue.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to