This is an automated email from the ASF dual-hosted git repository. sunxiaojian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push: new 3bfdef86 [ISSUES #530] fix order message bug (#532) 3bfdef86 is described below commit 3bfdef8661d9acc420bb7665501651c395461e92 Author: onceicy <64947722+once...@users.noreply.github.com> AuthorDate: Thu May 23 23:22:49 2024 +0800 [ISSUES #530] fix order message bug (#532) * [ISSUES #530] fix order message bug * [ISSUES #530] add order message compatible with rocketmq 4.x config * [ISSUES #530] add order message compatible with rocketmq 4.x config user guide * [ISSUES #530] Change the configuration item from 'ordering.msg.compatible.v4' to 'ordering.msg.enable' --------- Co-authored-by: 靖愉 <chenyuquan....@antgroup.com> --- README.md | 7 ++++--- .../connect/runtime/config/SourceConnectorConfig.java | 1 + .../connect/runtime/connectorwrapper/WorkerSourceTask.java | 11 ++++++++++- .../runtime/service/AbstractConfigManagementService.java | 6 ++++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 37ae98ce..49a87464 100644 --- a/README.md +++ b/README.md @@ -344,9 +344,10 @@ public interface Transform<R extends ConnectRecord> extends Component { ### Source Connector特殊配置 -| key | nullable | default | description | -|-----------------------------|---------|---------|------------------------------------------------------| -| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 | +| key | nullable | default | description | +|---------------------|---------|---------|--------------------------------------------------------------| +| connect.topicname | true | | 指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 | +| ordering.msg.enable | true | false | 当目标集群不为rocketmq 5.x时,顺序消息会乱序,若设置为true,才能支持顺序,但会降低connector性能 | ### Sink Connector特殊配置 | key | nullable | default | description | diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java index 12012917..f05abf3c 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; public class SourceConnectorConfig extends ConnectorConfig { public static final String CONNECT_TOPICNAME = "connect.topicname"; + public static final String ORDERING_MSG_ENABLE = "ordering.msg.enable"; public SourceConnectorConfig(ConnectKeyValue config) { super(config); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 79317bdb..1ac8f928 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -276,7 +276,16 @@ public class WorkerSourceTask extends WorkerTask { } else { // Partition message ordering, // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business - producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); + if (taskConfig.getProperties().get(SourceConnectorConfig.ORDERING_MSG_ENABLE).equals("true")) { + try { + SendResult result = producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys()); + callback.onSuccess(result); + } catch (Exception e) { + callback.onException(e); + } + } else { + producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); + } } } catch (RetriableException e) { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java index 855c0824..2c51c4ff 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java @@ -340,6 +340,12 @@ public abstract class AbstractConfigManagementService implements ConfigManagemen if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) { newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME, configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME)); } + // put ordering msg enable config + if (configs.containsKey(SourceConnectorConfig.ORDERING_MSG_ENABLE)) { + newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, configs.getString(SourceConnectorConfig.ORDERING_MSG_ENABLE)); + } else { + newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE, "false"); + } // sink consume topic if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) { newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES, configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));