rhauch commented on a change in pull request #9669: URL: https://github.com/apache/kafka/pull/9669#discussion_r534419655
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -206,16 +203,13 @@ public void cancel() { public void stop() { super.stop(); stopRequestedLatch.countDown(); - synchronized (this) { - if (finishedStart) - tryStop(); - else - startedShutdownBeforeStartCompleted = true; - } } - private synchronized void tryStop() { - if (!stopped) { + // Note: This method is not thread-safe Review comment: Maybe a nit, but this comment seems a bit misleading. It sounds as if the method need be called from a thread safe point, but otherwise there are no restrictions on how it's used. Strictly speaking, this method need not be thread safe because it should *only* be called from within `close()`, which is called from the task's own thread and therefore is thread safe. Perhaps this design assumption should be mentioned here. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -228,17 +222,16 @@ private synchronized void tryStop() { @Override public void execute() { try { + // If we try to start the task at all (by invoking initialize and possibly start), we count this as + // "started" in order to properly clean up any resources allocated by those invocations when the task is + // shut down by calling stop. If the task throws an exception during startup, it should still be able to + // clean up any allocated resources when stop is called, and if it isn't able to or even throws another + // exception during stop, the worst thing that happens is another exception gets logged for an already- + // failed task Review comment: Minor suggestions to improve wording and to be a bit more explicit about expectations for Task implementations: ```suggestion // If we try to start the task at all by invoking initialize, then count this as // "started" and expect a subsequent call to the task's stop() method // to properly clean up any resources allocated by its initialize() or // start() methods. If the task throws an exception during stop(), // the worst thing that happens is another exception gets logged for an already- // failed task ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java ########## @@ -263,145 +332,153 @@ private void verifyNormalConnector() throws InterruptedException { normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS); } - public static class BlockingConnector extends SourceConnector { - + private static class Block { private static CountDownLatch blockLatch; - private String block; + private final String block; public static final String BLOCK_CONFIG = "block"; - public static final String INITIALIZE = "initialize"; - public static final String INITIALIZE_WITH_TASK_CONFIGS = "initializeWithTaskConfigs"; - public static final String START = "start"; - public static final String RECONFIGURE = "reconfigure"; - public static final String TASK_CLASS = "taskClass"; - public static final String TASK_CONFIGS = "taskConfigs"; - public static final String STOP = "stop"; - public static final String VALIDATE = "validate"; - public static final String CONFIG = "config"; - public static final String VERSION = "version"; - - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define( - BLOCK_CONFIG, - ConfigDef.Type.STRING, - "", - ConfigDef.Importance.MEDIUM, - "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs', 'version'" - ); - - // No-args constructor required by the framework - public BlockingConnector() { - this(null); - } - - protected BlockingConnector(String block) { - this.block = block; - synchronized (BlockingConnector.class) { - if (blockLatch != null) { - blockLatch.countDown(); - } - blockLatch = new CountDownLatch(1); - } + private static ConfigDef config() { + return new ConfigDef() + .define( + BLOCK_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.MEDIUM, + "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs', 'version'" + ); } public static void waitForBlock() throws InterruptedException { - synchronized (BlockingConnector.class) { + synchronized (Block.class) { if (blockLatch == null) { throw new IllegalArgumentException("No connector has been created yet"); } } - + log.debug("Waiting for connector to block"); blockLatch.await(); log.debug("Connector should now be blocked"); } public static void resetBlockLatch() { - synchronized (BlockingConnector.class) { + synchronized (Block.class) { if (blockLatch != null) { blockLatch.countDown(); blockLatch = null; } } } + public Block(Map<String, String> props) { + this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG)); + } + + public Block(String block) { + this.block = block; + synchronized (Block.class) { + if (blockLatch != null) { + blockLatch.countDown(); + } + blockLatch = new CountDownLatch(1); Review comment: How does this static field initialization work within a blocking connector that also has a blocking task instance? Aren't both constructors called, resulting in the task constructor re-initializing the static `blockLatch` instance? Does it work because the tests don't block both a connector method and a task method? If so, doesn't that make it somewhat brittle if people want to add more tests but don't infer that limitation? Would it be better to store a static list of all Block latches, have the constructor simply create a block latch and add it to that static list, and then have the `resetBlockLatch()` method remove each of the Block latches and call `countDown()` on them? (This still doesn't bode well for running the tests in parallel, but ATM that's not an issue.) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org