This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push: new 0a1a033 [ISSUE #105] Fix sink retryTimes does not work (#106) 0a1a033 is described below commit 0a1a03367ca422f3f4d5011b368e8e0ab0f91db8 Author: Humkum <1109939...@qq.com> AuthorDate: Wed Jan 10 17:31:20 2024 +0800 [ISSUE #105] Fix sink retryTimes does not work (#106) --- .../rocketmq/sink/table/RocketMQDynamicTableSink.java | 1 + .../rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java index 73e8af7..98a15e6 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java @@ -239,6 +239,7 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti Properties producerProps = new Properties(); producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup); producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); + producerProps.setProperty(RocketMQConfig.PRODUCER_RETRY_TIMES, String.valueOf(retryTimes)); if (accessKey != null && secretKey != null) { producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey); producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java index 03a89b1..9c1ca11 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java @@ -64,6 +64,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory Set<ConfigOption<?>> requiredOptions = new HashSet<>(); requiredOptions.add(TOPIC); requiredOptions.add(PRODUCER_GROUP); + requiredOptions.add(ENDPOINTS); + // requiredOptions.add(PERSIST_OFFSET_INTERVAL); return requiredOptions; } @@ -76,6 +78,13 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory optionalOptions.add(OPTIONAL_FIELD_DELIMITER); optionalOptions.add(OPTIONAL_ACCESS_KEY); optionalOptions.add(OPTIONAL_SECRET_KEY); + optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN); + optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES); + optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS); + optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG); + optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED); + optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY); + optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS); return optionalOptions; } @@ -122,8 +131,8 @@ public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory dynamicColumn, fieldDelimiter, encoding, - sleepTimeMs, retryTimes, + sleepTimeMs, isDynamicTag, isDynamicTagIncluded, writeKeysToBody,