DongLiang-0 commented on code in PR #35: URL: https://github.com/apache/doris-kafka-connector/pull/35#discussion_r1664124533
########## src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java: ########## @@ -114,6 +116,13 @@ public DorisOptions(Map<String, String> config) { Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS)); } getStreamLoadPropFromConfig(config); + if (getStreamLoadProp().containsKey(LoadConstants.GROUP_COMMIT)) { + this.enableGroupCommit = + ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC()); + if (!enableGroupCommit) { Review Comment: After throwing an exception, can delete it here ########## src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java: ########## @@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, String[] instances) { } return false; } + + public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) { + if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) { + return false; + } + + Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT); + if (!GroupCommitMode.instances().contains(value.toString().toLowerCase())) { + LOG.error("The value of group commit mode is an illegal parameter of {}.", value); + return false; + } else if (enable2PC) { + LOG.error( + "When group commit is enabled, you should disable two phase commit! Please set 'enable.2pc':'false'"); + return false; Review Comment: If the 2pc submission is not enabled, an exception needs to be thrown so that the user can perceive that the parameter needs to be closed manually. ########## src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java: ########## @@ -182,6 +195,10 @@ public boolean enable2PC() { return enable2PC; } + public boolean isEnableGroupCommit() { Review Comment: ```suggestion public boolean enableGroupCommit() { ``` ########## src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java: ########## @@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, String[] instances) { } return false; } + + public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) { + if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) { + return false; + } + + Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT); + if (!GroupCommitMode.instances().contains(value.toString().toLowerCase())) { + LOG.error("The value of group commit mode is an illegal parameter of {}.", value); + return false; Review Comment: thrown an exception ########## src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java: ########## @@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, String[] instances) { } return false; } + + public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) { Review Comment: Is it possible to add a unit test here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org