DongLiang-0 commented on code in PR #35:
URL: 
https://github.com/apache/doris-kafka-connector/pull/35#discussion_r1664124533


##########
src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java:
##########
@@ -114,6 +116,13 @@ public DorisOptions(Map<String, String> config) {
                     
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
         }
         getStreamLoadPropFromConfig(config);
+        if (getStreamLoadProp().containsKey(LoadConstants.GROUP_COMMIT)) {
+            this.enableGroupCommit =
+                    
ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC());
+            if (!enableGroupCommit) {

Review Comment:
   After throwing an exception,  can delete it here



##########
src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java:
##########
@@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, 
String[] instances) {
         }
         return false;
     }
+
+    public static boolean validateGroupCommitMode(Properties streamLoadProp, 
boolean enable2PC) {
+        if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
+            return false;
+        }
+
+        Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT);
+        if 
(!GroupCommitMode.instances().contains(value.toString().toLowerCase())) {
+            LOG.error("The value of group commit mode is an illegal parameter 
of {}.", value);
+            return false;
+        } else if (enable2PC) {
+            LOG.error(
+                    "When group commit is enabled, you should disable two 
phase commit! Please  set 'enable.2pc':'false'");
+            return false;

Review Comment:
   If the 2pc submission is not enabled, an exception needs to be thrown so 
that the user can perceive that the parameter needs to be closed manually.



##########
src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java:
##########
@@ -182,6 +195,10 @@ public boolean enable2PC() {
         return enable2PC;
     }
 
+    public boolean isEnableGroupCommit() {

Review Comment:
   ```suggestion
       public boolean enableGroupCommit() {
   ```



##########
src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java:
##########
@@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, 
String[] instances) {
         }
         return false;
     }
+
+    public static boolean validateGroupCommitMode(Properties streamLoadProp, 
boolean enable2PC) {
+        if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
+            return false;
+        }
+
+        Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT);
+        if 
(!GroupCommitMode.instances().contains(value.toString().toLowerCase())) {
+            LOG.error("The value of group commit mode is an illegal parameter 
of {}.", value);
+            return false;

Review Comment:
   thrown an exception



##########
src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java:
##########
@@ -293,4 +298,26 @@ private static boolean validateEnumInstances(String value, 
String[] instances) {
         }
         return false;
     }
+
+    public static boolean validateGroupCommitMode(Properties streamLoadProp, 
boolean enable2PC) {

Review Comment:
   Is it possible to add a unit test here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to