This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a1a033  [ISSUE #105] Fix sink retryTimes does not work (#106)
0a1a033 is described below

commit 0a1a03367ca422f3f4d5011b368e8e0ab0f91db8
Author: Humkum <1109939...@qq.com>
AuthorDate: Wed Jan 10 17:31:20 2024 +0800

    [ISSUE #105] Fix sink retryTimes does not work (#106)
---
 .../rocketmq/sink/table/RocketMQDynamicTableSink.java         |  1 +
 .../rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java  | 11 ++++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
index 73e8af7..98a15e6 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
@@ -239,6 +239,7 @@ public class RocketMQDynamicTableSink implements 
DynamicTableSink, SupportsWriti
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, 
producerGroup);
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
nameServerAddress);
+        producerProps.setProperty(RocketMQConfig.PRODUCER_RETRY_TIMES, 
String.valueOf(retryTimes));
         if (accessKey != null && secretKey != null) {
             producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
             producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
index 03a89b1..9c1ca11 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
@@ -64,6 +64,8 @@ public class RocketMQDynamicTableSinkFactory implements 
DynamicTableSinkFactory
         Set<ConfigOption<?>> requiredOptions = new HashSet<>();
         requiredOptions.add(TOPIC);
         requiredOptions.add(PRODUCER_GROUP);
+        requiredOptions.add(ENDPOINTS);
+
         // requiredOptions.add(PERSIST_OFFSET_INTERVAL);
         return requiredOptions;
     }
@@ -76,6 +78,13 @@ public class RocketMQDynamicTableSinkFactory implements 
DynamicTableSinkFactory
         optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
         optionalOptions.add(OPTIONAL_ACCESS_KEY);
         optionalOptions.add(OPTIONAL_SECRET_KEY);
+        optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
+        optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES);
+        optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS);
+        optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
+        optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
+        optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY);
+        optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
         return optionalOptions;
     }
 
@@ -122,8 +131,8 @@ public class RocketMQDynamicTableSinkFactory implements 
DynamicTableSinkFactory
                 dynamicColumn,
                 fieldDelimiter,
                 encoding,
-                sleepTimeMs,
                 retryTimes,
+                sleepTimeMs,
                 isDynamicTag,
                 isDynamicTagIncluded,
                 writeKeysToBody,

Reply via email to