JNSimba commented on code in PR #85: URL: https://github.com/apache/doris-kafka-connector/pull/85#discussion_r2268421703
########## src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java: ########## @@ -212,100 +215,145 @@ public static ConfigDef newConfigDef() { Importance.HIGH, "Doris database name", DORIS_INFO, - 6, + 5, ConfigDef.Width.NONE, DORIS_DATABASE) .define( TOPICS_TABLES_MAP, Type.STRING, "", topicToTableValidator, - Importance.LOW, + Importance.HIGH, "Map of topics to tables (optional). Format : comma-separated tuples, e.g." + " <topic-1>:<table-1>,<topic-2>:<table-2>,... ", CONNECTOR_CONFIG, - 0, + 1, ConfigDef.Width.NONE, TOPICS_TABLES_MAP) .define( BUFFER_COUNT_RECORDS, Type.LONG, BUFFER_COUNT_RECORDS_DEFAULT, ConfigDef.Range.atLeast(1), - Importance.LOW, - "Number of records buffered in memory per partition before triggering", + Importance.HIGH, + "Number of records buffered in memory before triggering", CONNECTOR_CONFIG, - 1, + 2, ConfigDef.Width.NONE, BUFFER_COUNT_RECORDS) .define( BUFFER_SIZE_BYTES, Type.LONG, BUFFER_SIZE_BYTES_DEFAULT, ConfigDef.Range.atLeast(1), - Importance.LOW, - "Cumulative size of records buffered in memory per partition before triggering", + Importance.HIGH, + "Cumulative size of records buffered in memory before triggering", CONNECTOR_CONFIG, - 2, + 3, ConfigDef.Width.NONE, BUFFER_SIZE_BYTES) .define( BUFFER_FLUSH_TIME_SEC, Type.LONG, BUFFER_FLUSH_TIME_SEC_DEFAULT, ConfigDef.Range.atLeast(Duration.ofSeconds(1).getSeconds()), - Importance.LOW, + Importance.HIGH, "The time in seconds to flush cached data", CONNECTOR_CONFIG, - 3, + 4, ConfigDef.Width.NONE, BUFFER_FLUSH_TIME_SEC) + .define( + ENABLE_COMBINE_FLUSH, + Type.BOOLEAN, + ENABLE_COMBINE_FLUSH_DEFAULT, + Importance.HIGH, + "Whether to merge data from all partitions together and write them. The default value is false. When enabled, only at_least_once semantics are guaranteed.", + CONNECTOR_CONFIG, + 5, + ConfigDef.Width.NONE, + ENABLE_COMBINE_FLUSH) + .define( + DELIVERY_GUARANTEE, + Type.STRING, + DELIVERY_GUARANTEE_DEFAULT, + Importance.MEDIUM, + "How to ensure data consistency when consuming Kafka data is imported into Doris. Supports at_least_once exactly_once, default is at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data exactly_once", + CONNECTOR_CONFIG, + 6, + ConfigDef.Width.NONE, + DELIVERY_GUARANTEE, + EnumRecommender.in(DeliveryGuarantee.values())) .define( RECORD_TABLE_NAME_FIELD, Type.STRING, null, Importance.LOW, "The field name of record, and use this field value as the table name to be written", CONNECTOR_CONFIG, - 4, + 7, ConfigDef.Width.NONE, RECORD_TABLE_NAME_FIELD) + // debezium config .define( - JMX_OPT, - ConfigDef.Type.BOOLEAN, - JMX_OPT_DEFAULT, - ConfigDef.Importance.HIGH, - "Whether to enable JMX MBeans for custom metrics") + CONVERTER_MODE, + Type.STRING, + CONVERT_MODE_DEFAULT, + Importance.LOW, + "Type conversion mode of upstream data when using Connector to consume Kafka data.\n" + + "normal means consuming data in Kafka normally without any type conversion.\n" + + "debezium_ingestion means that when Kafka upstream data is collected through CDC (Changelog Data Capture) tools such as Debezium, the upstream data needs to undergo special type conversion to support it.", + DEBEZIUM_CONFIG, + 1, + ConfigDef.Width.NONE, + CONVERTER_MODE) .define( ENABLE_DELETE, ConfigDef.Type.BOOLEAN, ENABLE_DELETE_DEFAULT, - ConfigDef.Importance.HIGH, - "Used to synchronize delete events") - .define( - LOAD_MODEL, - Type.STRING, - LOAD_MODEL_DEFAULT, - Importance.HIGH, - "load model is stream_load.") + Importance.LOW, + "Under Debezium synchronization, whether to synchronize deletion events. Non-Debezium messages need to be marked with deletions themselves.", + DEBEZIUM_CONFIG, + 2, + ConfigDef.Width.NONE, + ENABLE_DELETE) + // Retries .define( MAX_RETRIES, Type.INT, MAX_RETRIES_DEFAULT, Importance.MEDIUM, - "The maximum number of times to retry on errors before failing the task.") + "The maximum number of times to retry on errors before failing the task.", + RETRIES_GROUP, + 1, + ConfigDef.Width.NONE, + MAX_RETRIES) .define( RETRY_INTERVAL_MS, Type.INT, RETRY_INTERVAL_MS_DEFAULT, Importance.MEDIUM, - "The time in milliseconds to wait following an error before a retry attempt is made.") + "The time in milliseconds to wait following an error before a retry attempt is made.", + RETRIES_GROUP, + 2, + ConfigDef.Width.NONE, + RETRY_INTERVAL_MS) .define( BEHAVIOR_ON_NULL_VALUES, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT, Importance.LOW, - "Used to handle records with a null value ."); + "Used to handle records with a null value.", + CONNECTOR_CONFIG, + 18, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org