chuang-wang-pre commented on code in PR #85:
URL: 
https://github.com/apache/doris-kafka-connector/pull/85#discussion_r2265899547


##########
src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java:
##########
@@ -212,100 +215,145 @@ public static ConfigDef newConfigDef() {
                         Importance.HIGH,
                         "Doris database name",
                         DORIS_INFO,
-                        6,
+                        5,
                         ConfigDef.Width.NONE,
                         DORIS_DATABASE)
                 .define(
                         TOPICS_TABLES_MAP,
                         Type.STRING,
                         "",
                         topicToTableValidator,
-                        Importance.LOW,
+                        Importance.HIGH,
                         "Map of topics to tables (optional). Format : 
comma-separated tuples, e.g."
                                 + " 
<topic-1>:<table-1>,<topic-2>:<table-2>,... ",
                         CONNECTOR_CONFIG,
-                        0,
+                        1,
                         ConfigDef.Width.NONE,
                         TOPICS_TABLES_MAP)
                 .define(
                         BUFFER_COUNT_RECORDS,
                         Type.LONG,
                         BUFFER_COUNT_RECORDS_DEFAULT,
                         ConfigDef.Range.atLeast(1),
-                        Importance.LOW,
-                        "Number of records buffered in memory per partition 
before triggering",
+                        Importance.HIGH,
+                        "Number of records buffered in memory before 
triggering",
                         CONNECTOR_CONFIG,
-                        1,
+                        2,
                         ConfigDef.Width.NONE,
                         BUFFER_COUNT_RECORDS)
                 .define(
                         BUFFER_SIZE_BYTES,
                         Type.LONG,
                         BUFFER_SIZE_BYTES_DEFAULT,
                         ConfigDef.Range.atLeast(1),
-                        Importance.LOW,
-                        "Cumulative size of records buffered in memory per 
partition before triggering",
+                        Importance.HIGH,
+                        "Cumulative size of records buffered in memory before 
triggering",
                         CONNECTOR_CONFIG,
-                        2,
+                        3,
                         ConfigDef.Width.NONE,
                         BUFFER_SIZE_BYTES)
                 .define(
                         BUFFER_FLUSH_TIME_SEC,
                         Type.LONG,
                         BUFFER_FLUSH_TIME_SEC_DEFAULT,
                         
ConfigDef.Range.atLeast(Duration.ofSeconds(1).getSeconds()),
-                        Importance.LOW,
+                        Importance.HIGH,
                         "The time in seconds to flush cached data",
                         CONNECTOR_CONFIG,
-                        3,
+                        4,
                         ConfigDef.Width.NONE,
                         BUFFER_FLUSH_TIME_SEC)
+                .define(
+                        ENABLE_COMBINE_FLUSH,
+                        Type.BOOLEAN,
+                        ENABLE_COMBINE_FLUSH_DEFAULT,
+                        Importance.HIGH,
+                        "Whether to merge data from all partitions together 
and write them. The default value is false. When enabled, only at_least_once 
semantics are guaranteed.",
+                        CONNECTOR_CONFIG,
+                        5,
+                        ConfigDef.Width.NONE,
+                        ENABLE_COMBINE_FLUSH)
+                .define(
+                        DELIVERY_GUARANTEE,
+                        Type.STRING,
+                        DELIVERY_GUARANTEE_DEFAULT,
+                        Importance.MEDIUM,
+                        "How to ensure data consistency when consuming Kafka 
data is imported into Doris. Supports at_least_once exactly_once, default is 
at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data 
exactly_once",
+                        CONNECTOR_CONFIG,
+                        6,
+                        ConfigDef.Width.NONE,
+                        DELIVERY_GUARANTEE,
+                        EnumRecommender.in(DeliveryGuarantee.values()))
                 .define(
                         RECORD_TABLE_NAME_FIELD,
                         Type.STRING,
                         null,
                         Importance.LOW,
                         "The field name of record, and use this field value as 
the table name to be written",
                         CONNECTOR_CONFIG,
-                        4,
+                        7,
                         ConfigDef.Width.NONE,
                         RECORD_TABLE_NAME_FIELD)
+                // debezium config
                 .define(
-                        JMX_OPT,
-                        ConfigDef.Type.BOOLEAN,
-                        JMX_OPT_DEFAULT,
-                        ConfigDef.Importance.HIGH,
-                        "Whether to enable JMX MBeans for custom metrics")
+                        CONVERTER_MODE,
+                        Type.STRING,
+                        CONVERT_MODE_DEFAULT,
+                        Importance.LOW,
+                        "Type conversion mode of upstream data when using 
Connector to consume Kafka data.\n"
+                                + "normal means consuming data in Kafka 
normally without any type conversion.\n"
+                                + "debezium_ingestion means that when Kafka 
upstream data is collected through CDC (Changelog Data Capture) tools such as 
Debezium, the upstream data needs to undergo special type conversion to support 
it.",
+                        DEBEZIUM_CONFIG,
+                        1,
+                        ConfigDef.Width.NONE,
+                        CONVERTER_MODE)
                 .define(
                         ENABLE_DELETE,
                         ConfigDef.Type.BOOLEAN,
                         ENABLE_DELETE_DEFAULT,
-                        ConfigDef.Importance.HIGH,
-                        "Used to synchronize delete events")
-                .define(
-                        LOAD_MODEL,
-                        Type.STRING,
-                        LOAD_MODEL_DEFAULT,
-                        Importance.HIGH,
-                        "load model is stream_load.")
+                        Importance.LOW,
+                        "Under Debezium synchronization, whether to 
synchronize deletion events. Non-Debezium messages need to be marked with 
deletions themselves.",
+                        DEBEZIUM_CONFIG,
+                        2,
+                        ConfigDef.Width.NONE,
+                        ENABLE_DELETE)
+                // Retries
                 .define(
                         MAX_RETRIES,
                         Type.INT,
                         MAX_RETRIES_DEFAULT,
                         Importance.MEDIUM,
-                        "The maximum number of times to retry on errors before 
failing the task.")
+                        "The maximum number of times to retry on errors before 
failing the task.",
+                        RETRIES_GROUP,
+                        1,
+                        ConfigDef.Width.NONE,
+                        MAX_RETRIES)
                 .define(
                         RETRY_INTERVAL_MS,
                         Type.INT,
                         RETRY_INTERVAL_MS_DEFAULT,
                         Importance.MEDIUM,
-                        "The time in milliseconds to wait following an error 
before a retry attempt is made.")
+                        "The time in milliseconds to wait following an error 
before a retry attempt is made.",
+                        RETRIES_GROUP,
+                        2,
+                        ConfigDef.Width.NONE,
+                        RETRY_INTERVAL_MS)
                 .define(
                         BEHAVIOR_ON_NULL_VALUES,
                         Type.STRING,
                         BEHAVIOR_ON_NULL_VALUES_DEFAULT,
                         Importance.LOW,
-                        "Used to handle records with a null value .");
+                        "Used to handle records with a null value.",
+                        CONNECTOR_CONFIG,
+                        18,

Review Comment:
   Maybe this order number is also need to change and put it together with 
other configurations in the 'CONNECTOR_CONFIG' group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to