C0urante commented on a change in pull request #9669:
URL: https://github.com/apache/kafka/pull/9669#discussion_r534548973



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##########
@@ -263,145 +332,153 @@ private void verifyNormalConnector() throws 
InterruptedException {
         normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
     }
 
-    public static class BlockingConnector extends SourceConnector {
-
+    private static class Block {
         private static CountDownLatch blockLatch;
 
-        private String block;
+        private final String block;
 
         public static final String BLOCK_CONFIG = "block";
 
-        public static final String INITIALIZE = "initialize";
-        public static final String INITIALIZE_WITH_TASK_CONFIGS = 
"initializeWithTaskConfigs";
-        public static final String START = "start";
-        public static final String RECONFIGURE = "reconfigure";
-        public static final String TASK_CLASS = "taskClass";
-        public static final String TASK_CONFIGS = "taskConfigs";
-        public static final String STOP = "stop";
-        public static final String VALIDATE = "validate";
-        public static final String CONFIG = "config";
-        public static final String VERSION = "version";
-
-        private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(
-                BLOCK_CONFIG,
-                ConfigDef.Type.STRING,
-                "",
-                ConfigDef.Importance.MEDIUM,
-                "Where to block indefinitely, e.g., 'start', 'initialize', 
'taskConfigs', 'version'"
-            );
-
-        // No-args constructor required by the framework
-        public BlockingConnector() {
-            this(null);
-        }
-
-        protected BlockingConnector(String block) {
-            this.block = block;
-            synchronized (BlockingConnector.class) {
-                if (blockLatch != null) {
-                    blockLatch.countDown();
-                }
-                blockLatch = new CountDownLatch(1);
-            }
+        private static ConfigDef config() {
+            return new ConfigDef()
+                .define(
+                    BLOCK_CONFIG,
+                    ConfigDef.Type.STRING,
+                    "",
+                    ConfigDef.Importance.MEDIUM,
+                    "Where to block indefinitely, e.g., 'start', 'initialize', 
'taskConfigs', 'version'"
+                );
         }
 
         public static void waitForBlock() throws InterruptedException {
-            synchronized (BlockingConnector.class) {
+            synchronized (Block.class) {
                 if (blockLatch == null) {
                     throw new IllegalArgumentException("No connector has been 
created yet");
                 }
             }
-            
+
             log.debug("Waiting for connector to block");
             blockLatch.await();
             log.debug("Connector should now be blocked");
         }
 
         public static void resetBlockLatch() {
-            synchronized (BlockingConnector.class) {
+            synchronized (Block.class) {
                 if (blockLatch != null) {
                     blockLatch.countDown();
                     blockLatch = null;
                 }
             }
         }
 
+        public Block(Map<String, String> props) {
+            this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG));
+        }
+
+        public Block(String block) {
+            this.block = block;
+            synchronized (Block.class) {
+                if (blockLatch != null) {
+                    blockLatch.countDown();
+                }
+                blockLatch = new CountDownLatch(1);

Review comment:
       Ack, added a comment. LMKWYT




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to