chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1662903374
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } + @Override + public Config validate(Map<String, String> connectorConfigs) { + List<ConfigValue> configValues = super.validate(connectorConfigs).configValues(); + MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> + configValues.stream() + .filter(conf -> conf.name().equals(config)) + .forEach(conf -> conf.errorMessages().add(errorMsg))); Review Comment: could you please use `addErrorMessage` instead of `errorMessages().add`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } + @Override + public Config validate(Map<String, String> connectorConfigs) { + List<ConfigValue> configValues = super.validate(connectorConfigs).configValues(); + MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> + configValues.stream() + .filter(conf -> conf.name().equals(config)) Review Comment: `EMIT_OFFSET_SYNCS_ENABLED` is in `MirrorSourceConfig`, so it is not a part of config def of `MirrorCheckpointConnector`. Hence, the error related to `EMIT_OFFSET_SYNCS_ENABLED` can't be propagated. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -234,6 +234,30 @@ public ConfigDef config() { @Override public org.apache.kafka.common.config.Config validate(Map<String, String> props) { List<ConfigValue> configValues = super.validate(props).configValues(); + validateExactlyOnceConfigs(props, configValues); + validateEmitOffsetSyncConfigs(props, configValues); + + return new org.apache.kafka.common.config.Config(configValues); + } + + private static void validateEmitOffsetSyncConfigs(Map<String, String> props, List<ConfigValue> configValues) { + boolean offsetSyncsConfigured = props.keySet().stream() + .anyMatch(conf -> conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX)); + + if ("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && offsetSyncsConfigured) { Review Comment: what if `EMIT_OFFSET_SYNCS_ENABLED=true` and `offsetSyncsConfigured=false`? Should we add error message for it? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } + @Override + public Config validate(Map<String, String> connectorConfigs) { + List<ConfigValue> configValues = super.validate(connectorConfigs).configValues(); + MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> + configValues.stream() + .filter(conf -> conf.name().equals(config)) Review Comment: The following test shows my concern: ```java @Test public void test() { Map<String, String> props = new HashMap<>(); props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false"); MirrorCheckpointConnector connector = new MirrorCheckpointConnector(); Config config = connector.validate(props); assertEquals(1, config.configValues().stream().filter(c -> c.name().equals(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).count()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org