This is an automated email from the ASF dual-hosted git repository.

sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bfdef86 [ISSUES #530] fix order message bug (#532)
3bfdef86 is described below

commit 3bfdef8661d9acc420bb7665501651c395461e92
Author: onceicy <64947722+once...@users.noreply.github.com>
AuthorDate: Thu May 23 23:22:49 2024 +0800

    [ISSUES #530] fix order message bug (#532)
    
    * [ISSUES #530] fix order message bug
    
    * [ISSUES #530] add order message compatible with rocketmq 4.x config
    
    * [ISSUES #530] add order message compatible with rocketmq 4.x config user 
guide
    
    * [ISSUES #530] Change the configuration item from 
'ordering.msg.compatible.v4' to 'ordering.msg.enable'
    
    ---------
    
    Co-authored-by: 靖愉 <chenyuquan....@antgroup.com>
---
 README.md                                                     |  7 ++++---
 .../connect/runtime/config/SourceConnectorConfig.java         |  1 +
 .../connect/runtime/connectorwrapper/WorkerSourceTask.java    | 11 ++++++++++-
 .../runtime/service/AbstractConfigManagementService.java      |  6 ++++++
 4 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/README.md b/README.md
index 37ae98ce..49a87464 100644
--- a/README.md
+++ b/README.md
@@ -344,9 +344,10 @@ public interface Transform<R extends ConnectRecord> 
extends Component {
 
 
 ### Source Connector特殊配置
-| key                         | nullable | default | description               
                           |
-|-----------------------------|---------|---------|------------------------------------------------------|
-| connect.topicname           | true    |         | 
指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常  |
+| key                 | nullable | default | description                       
                           |
+|---------------------|---------|---------|--------------------------------------------------------------|
+| connect.topicname   | true    |         | 
指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常          |
+| ordering.msg.enable | true    | false   | 当目标集群不为rocketmq 
5.x时,顺序消息会乱序,若设置为true,才能支持顺序,但会降低connector性能 |
 
 ### Sink Connector特殊配置
 | key                                               | nullable | default       
        | description                |
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
index 12012917..f05abf3c 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
@@ -26,6 +26,7 @@ import 
org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 public class SourceConnectorConfig extends ConnectorConfig {
 
     public static final String CONNECT_TOPICNAME = "connect.topicname";
+    public static final String ORDERING_MSG_ENABLE = "ordering.msg.enable";
 
     public SourceConnectorConfig(ConnectKeyValue config) {
         super(config);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 79317bdb..1ac8f928 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -276,7 +276,16 @@ public class WorkerSourceTask extends WorkerTask {
                 } else {
                     // Partition message ordering,
                     // At the same time, ensure that the data is pulled in an 
orderly manner, which needs to be guaranteed by sourceTask in the business
-                    producer.send(sourceMessage, new 
SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+                    if 
(taskConfig.getProperties().get(SourceConnectorConfig.ORDERING_MSG_ENABLE).equals("true"))
 {
+                        try {
+                            SendResult result = producer.send(sourceMessage, 
new SelectMessageQueueByHash(), sourceMessage.getKeys());
+                            callback.onSuccess(result);
+                        } catch (Exception e) {
+                            callback.onException(e);
+                        }
+                    } else {
+                        producer.send(sourceMessage, new 
SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+                    }
                 }
 
             } catch (RetriableException e) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index 855c0824..2c51c4ff 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -340,6 +340,12 @@ public abstract class AbstractConfigManagementService 
implements ConfigManagemen
             if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
                 newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME, 
configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME));
             }
+            // put ordering msg enable config
+            if 
(configs.containsKey(SourceConnectorConfig.ORDERING_MSG_ENABLE)) {
+                newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, 
configs.getString(SourceConnectorConfig.ORDERING_MSG_ENABLE));
+            } else {
+                newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, 
"false");
+            }
             // sink consume topic
             if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) {
                 newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES, 
configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));

Reply via email to