This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 367790b4 [ISSUES #507] Optimize rebalance & optimize docker
367790b4 is described below
commit 367790b4dd009fa89d06fbe51f9e84e638978ae3
Author: zhoubo <[email protected]>
AuthorDate: Tue May 16 20:18:46 2023 +0800
[ISSUES #507] Optimize rebalance & optimize docker
---
distribution/conf/connect-distributed.conf | 18 +++++++++++++++++-
docker/connect/Dockerfile | 2 +-
.../connect/runtime/service/RebalanceImpl.java | 9 ++++++---
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/distribution/conf/connect-distributed.conf
b/distribution/conf/connect-distributed.conf
index 1c6c933b..d48ae781 100644
--- a/distribution/conf/connect-distributed.conf
+++ b/distribution/conf/connect-distributed.conf
@@ -14,7 +14,7 @@
# limitations under the License.
workerId=DEFAULT_WORKER_1
-storePathRootDir=/tmp/storeRoot1
+storePathRootDir=/home/connect/mq-connect/storeRoot
## Http port for user to access REST API
httpPort=8082
@@ -38,3 +38,19 @@
pluginPaths=/home/connect/mq-connect/plugins,rocketmq-connect-sample/target/rock
#metricsConfigPath=/home/connect/mq-connect/conf/metrics.conf
+# Cluster management service config
+clusterManagementService=org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl
+
+# Config management service config, default LocalConfigManagementServiceImpl
+configManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalConfigManagementServiceImpl
+#configManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqConfigManagementServiceImpl
+
+# Position management service config, default
LocalPositionManagementServiceImpl
+positionManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalPositionManagementServiceImpl
+#positionManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqPositionManagementServiceImpl
+
+# State management service config, default LocalStateManagementServiceImpl
+stateManagementService=org.apache.rocketmq.connect.runtime.service.local.LocalStateManagementServiceImpl
+#stateManagementService=org.apache.rocketmq.connect.runtime.service.rocketmq.RocketMqStateManagementServiceImpl
+
+
diff --git a/docker/connect/Dockerfile b/docker/connect/Dockerfile
index 77967f2c..076e0d85 100644
--- a/docker/connect/Dockerfile
+++ b/docker/connect/Dockerfile
@@ -52,4 +52,4 @@ COPY bin/connect-distributed.sh \
bin/connectAdmin \
/home/connect/mq-connect/bin/
-ENTRYPOINT ["/usr/bin/sh",
"/home/connect/mq-connect/bin/connect-distributed.sh", "-c",
"/home/connect/mq-connect/conf/connect-distributed.conf"]
+#ENTRYPOINT ["/usr/bin/sh",
"/home/connect/mq-connect/bin/connect-distributed.sh", "-c",
"/home/connect/mq-connect/conf/connect-distributed.conf"]
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 699c7191..417fcec5 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -113,12 +113,15 @@ public class RebalanceImpl {
* @param allocateResult
*/
private void updateProcessConfigsInRebalance(ConnAndTaskConfigs
allocateResult) {
-
try {
worker.startConnectors(allocateResult.getConnectorConfigs(),
connectController);
+ } catch (Throwable e) {
+ log.error("RebalanceImpl#updateProcessConfigsInRebalance start
connector failed", e);
+ }
+ try {
worker.startTasks(allocateResult.getTaskConfigs());
- } catch (Exception e) {
- log.error("RebalanceImpl#updateProcessConfigsInRebalance start
connector or task failed", e);
+ } catch (Throwable e) {
+ log.error("RebalanceImpl#updateProcessConfigsInRebalance start
task failed", e);
}
}