This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 367790b4 [ISSUES #507] Optimize rebalance & optimize docker
367790b4 is described below

commit 367790b4dd009fa89d06fbe51f9e84e638978ae3
Author: zhoubo <[email protected]>
AuthorDate: Tue May 16 20:18:46 2023 +0800

    [ISSUES #507] Optimize rebalance & optimize docker
---
 distribution/conf/connect-distributed.conf             | 18 +++++++++++++++++-
 docker/connect/Dockerfile                              |  2 +-
 .../connect/runtime/service/RebalanceImpl.java         |  9 ++++++---
 3 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/distribution/conf/connect-distributed.conf 
b/distribution/conf/connect-distributed.conf
index 1c6c933b..d48ae781 100644
--- a/distribution/conf/connect-distributed.conf
+++ b/distribution/conf/connect-distributed.conf
@@ -14,7 +14,7 @@
 #  limitations under the License.
 
 workerId=DEFAULT_WORKER_1
-storePathRootDir=/tmp/storeRoot1
+storePathRootDir=/home/connect/mq-connect/storeRoot
 
 ## Http port for user to access REST API
 httpPort=8082
@@ -38,3 +38,19 @@ 
pluginPaths=/home/connect/mq-connect/plugins,rocketmq-connect-sample/target/rock
 #metricsConfigPath=/home/connect/mq-connect/conf/metrics.conf
 
 
+# Cluster management service config
+clusterManagementService=org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl
+
+# Config management service config, default LocalConfigManagementServiceImpl
+configManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalConfigManagementServiceImpl
+#configManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqConfigManagementServiceImpl
+
+# Position management service config, default 
LocalPositionManagementServiceImpl
+positionManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalPositionManagementServiceImpl
+#positionManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqPositionManagementServiceImpl
+
+# State management service config, default LocalStateManagementServiceImpl
+stateManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalStateManagementServiceImpl
+#stateManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqStateManagementServiceImpl
+
+
diff --git a/docker/connect/Dockerfile b/docker/connect/Dockerfile
index 77967f2c..076e0d85 100644
--- a/docker/connect/Dockerfile
+++ b/docker/connect/Dockerfile
@@ -52,4 +52,4 @@ COPY bin/connect-distributed.sh \
      bin/connectAdmin \
      /home/connect/mq-connect/bin/
 
-ENTRYPOINT ["/usr/bin/sh", 
"/home/connect/mq-connect/bin/connect-distributed.sh", "-c", 
"/home/connect/mq-connect/conf/connect-distributed.conf"]
+#ENTRYPOINT ["/usr/bin/sh", 
"/home/connect/mq-connect/bin/connect-distributed.sh", "-c", 
"/home/connect/mq-connect/conf/connect-distributed.conf"]
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 699c7191..417fcec5 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -113,12 +113,15 @@ public class RebalanceImpl {
      * @param allocateResult
      */
     private void updateProcessConfigsInRebalance(ConnAndTaskConfigs 
allocateResult) {
-
         try {
             worker.startConnectors(allocateResult.getConnectorConfigs(), 
connectController);
+        } catch (Throwable e) {
+            log.error("RebalanceImpl#updateProcessConfigsInRebalance start 
connector failed", e);
+        }
+        try {
             worker.startTasks(allocateResult.getTaskConfigs());
-        } catch (Exception e) {
-            log.error("RebalanceImpl#updateProcessConfigsInRebalance start 
connector or task failed", e);
+        } catch (Throwable e) {
+            log.error("RebalanceImpl#updateProcessConfigsInRebalance start 
task failed", e);
         }
     }
 

Reply via email to