chia7712 commented on code in PR #17027:
URL: https://github.com/apache/kafka/pull/17027#discussion_r1736100999


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -644,34 +604,22 @@ void incrementalAlterConfigs(Map<String, Config> 
topicConfigs) throws ExecutionE
             configOps.put(configResource, ops);
         }
         log.trace("Syncing configs for {} topics.", configOps.size());
-        AtomicReference<Boolean> encounteredError = new 
AtomicReference<>(false);
-        adminCall(
-                () -> {
-                    
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-                        if (e != null) {
-                            if 
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
-                                    && e instanceof 
UnsupportedVersionException && !encounteredError.get()) {
-                                //Fallback logic
-                                log.warn("The target cluster {} is not 
compatible with IncrementalAlterConfigs API. "
-                                                + "Therefore using deprecated 
AlterConfigs API for syncing configs for topic {}",
-                                        sourceAndTarget.target(), k.name(), e);
-                                encounteredError.set(true);
-                                useIncrementalAlterConfigs = false;
-                            } else if 
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
-                                    && e instanceof 
UnsupportedVersionException && !encounteredError.get()) {
-                                log.error("Failed to sync configs for topic {} 
on cluster {} with IncrementalAlterConfigs API", k.name(), 
sourceAndTarget.target(), e);
-                                encounteredError.set(true);
-                                context.raiseError(new 
ConnectException("use.incremental.alter.configs was set to \"required\", but 
the target cluster '"
-                                        + sourceAndTarget.target() + "' is not 
compatible with IncrementalAlterConfigs API", e));
-                            } else {
-                                log.warn("Could not alter configuration of 
topic {}.", k.name(), e);

Review Comment:
   please keep this `else` condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to