C0urante commented on a change in pull request #9669: URL: https://github.com/apache/kafka/pull/9669#discussion_r534465543
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java ########## @@ -263,145 +332,153 @@ private void verifyNormalConnector() throws InterruptedException { normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS); } - public static class BlockingConnector extends SourceConnector { - + private static class Block { private static CountDownLatch blockLatch; - private String block; + private final String block; public static final String BLOCK_CONFIG = "block"; - public static final String INITIALIZE = "initialize"; - public static final String INITIALIZE_WITH_TASK_CONFIGS = "initializeWithTaskConfigs"; - public static final String START = "start"; - public static final String RECONFIGURE = "reconfigure"; - public static final String TASK_CLASS = "taskClass"; - public static final String TASK_CONFIGS = "taskConfigs"; - public static final String STOP = "stop"; - public static final String VALIDATE = "validate"; - public static final String CONFIG = "config"; - public static final String VERSION = "version"; - - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define( - BLOCK_CONFIG, - ConfigDef.Type.STRING, - "", - ConfigDef.Importance.MEDIUM, - "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs', 'version'" - ); - - // No-args constructor required by the framework - public BlockingConnector() { - this(null); - } - - protected BlockingConnector(String block) { - this.block = block; - synchronized (BlockingConnector.class) { - if (blockLatch != null) { - blockLatch.countDown(); - } - blockLatch = new CountDownLatch(1); - } + private static ConfigDef config() { + return new ConfigDef() + .define( + BLOCK_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.MEDIUM, + "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs', 'version'" + ); } public static void waitForBlock() throws InterruptedException { - synchronized (BlockingConnector.class) { + synchronized (Block.class) { if (blockLatch == null) { throw new IllegalArgumentException("No connector has been created yet"); } } - + log.debug("Waiting for connector to block"); blockLatch.await(); log.debug("Connector should now be blocked"); } public static void resetBlockLatch() { - synchronized (BlockingConnector.class) { + synchronized (Block.class) { if (blockLatch != null) { blockLatch.countDown(); blockLatch = null; } } } + public Block(Map<String, String> props) { + this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG)); + } + + public Block(String block) { + this.block = block; + synchronized (Block.class) { + if (blockLatch != null) { + blockLatch.countDown(); + } + blockLatch = new CountDownLatch(1); Review comment: > Does it work because the tests don't block both a connector method and a task method? Yep, that's exactly it. Actually, it's even broader--it works because no test involves blocking in more than one method. > If so, doesn't that make it somewhat brittle if people want to add more tests but don't infer that limitation? It does make it brittle for that case, although since connectors and tasks are completely decoupled w/r/t allocation in a Connect cluster (they frequently exist on separate workers, for example), I'm a little skeptical of the need for a test that touches on a connector that blocks in its `Connector` and `Task` classes, either now or in the future. > Would it be better to store a static list of all Block latches, have the constructor simply create a block latch and add it to that static list, and then have the resetBlockLatch() method remove each of the Block latches and call countDown() on them? I'm happy to leave a comment on the limitations of the current test setup and how it could be expanded to cover more ground in the future, but it seems a bit premature to put in the necessary infrastructure for that when there aren't any identified cases yet that would require it, especially since the PR already greatly expands on the capabilities for testing blocks in connectors and tasks but only uses a portion of it. Is that fair? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org