This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/feat/runtime-manager by this 
push:
     new 2da769b  feat:add test case.
2da769b is described below

commit 2da769bb2a91b0bea5726ba43fbe5620e1498d1c
Author: 2011shenlin <2011shen...@gmail.com>
AuthorDate: Wed Jan 17 16:39:54 2024 +0800

    feat:add test case.
---
 .../ClusterSelectorService.java}                   |  14 ++--
 .../runtime/manager/cluster/ClusterService.java    |   3 +-
 .../manager/dispatch/RunnerTaskDispatcher.java     |  31 +++++++--
 ...nnerTaskRebalance.java => WorkerRebalance.java} |   2 +-
 .../{ClusterWorkerScale.java => ClusterScale.java} |   4 +-
 .../runtime/manager/task/RunnerTaskService.java    |   9 +++
 .../adapter/runtime/manager/watch/WatchWorker.java |   3 +-
 .../WorkerLoadService.java}                        |  11 ++--
 .../WorkerSelectorService.java}                    |  26 +++++---
 .../WorkerInstanceRepositoryOnK8STest.java         |  73 +++++++++++++++++++++
 rocketmq_eventbridge.mv.db                         | Bin 81920 -> 77824 bytes
 11 files changed, 147 insertions(+), 29 deletions(-)

diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
similarity index 73%
copy from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
copy to 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
index 67254db..6ba2766 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java
@@ -15,14 +15,18 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster;
 
-import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import org.springframework.stereotype.Service;
 
-public class RunnerTaskService {
+@Service
+public class ClusterSelectorService {
+    public Cluster selectCluster(RunnerTask runnerTask) {
+        return selectDefaultCluster();
+    }
 
-    List<RunnerTask> listRunnerTask(String runnerName) {
+    public Cluster selectDefaultCluster() {
         return null;
     }
-
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
index 7ac59ca..ef27461 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java
@@ -38,10 +38,11 @@ public class ClusterService {
         return true;
     }
 
-    public Cluster getCluster(String clusterName) {
+    public Cluster getCluster(long clusterId) {
         return null;
     }
 
+
     public List<Cluster> listCluster() {
         return clusterRepository.listCluster();
     }
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
index 605d38f..696265b 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
@@ -17,16 +17,39 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
 
+import javax.annotation.Resource;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterSelectorService;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTaskService;
 import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerSelectorService;
 
 public class RunnerTaskDispatcher {
 
-    public boolean dispatchToCluster(Worker worker) {
-        return false;
+    @Resource
+    ClusterSelectorService clusterSelectorService;
+
+    @Resource
+    WorkerSelectorService workerSelectorService;
+
+    @Resource
+    RunnerTaskService runnerTaskService;
+
+    public boolean dispatchRunnerTask(RunnerTask runnerTask) {
+        boolean dispatchToCluster = dispatchToCluster(runnerTask);
+        boolean dispatchToWorker = dispatchToWorker(runnerTask);
+        return dispatchToCluster && dispatchToWorker;
+    }
+
+    public boolean dispatchToCluster(RunnerTask runnerTask) {
+        Cluster cluster = clusterSelectorService.selectCluster(runnerTask);
+        return runnerTaskService.updateRunnerTaskCluster(runnerTask, cluster);
     }
 
-    public boolean dispatchToWorker(Worker worker) {
-        return false;
+    public boolean dispatchToWorker(RunnerTask runnerTask) {
+        Worker worker = workerSelectorService.selectWorker(runnerTask);
+        return runnerTaskService.updateRunnerTaskWorker(runnerTask, worker);
     }
 
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
similarity index 96%
rename from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
rename to 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
index 5abdb47..052bb82 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java
@@ -19,7 +19,7 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
 
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class RunnerTaskRebalance {
+public class WorkerRebalance {
 
     @Autowired
     private RunnerTaskDispatcher runnerTaskDispatcher;
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
similarity index 95%
rename from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
rename to 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
index be22fa0..866695b 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class ClusterWorkerScale {
+public class ClusterScale {
     @Autowired
     ClusterService clusterService;
 
@@ -37,7 +37,7 @@ public class ClusterWorkerScale {
     private int DEFAULT_SCALE_DOWN_TRIGGER_LOAD = 20;
 
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryImpl(ClusterWorkerScale.class.getSimpleName()));
+        new ThreadFactoryImpl(ClusterScale.class.getSimpleName()));
 
     public void start() {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
index 67254db..90df069 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
 
 import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
 
 public class RunnerTaskService {
 
@@ -25,4 +27,11 @@ public class RunnerTaskService {
         return null;
     }
 
+    public boolean updateRunnerTaskWorker(RunnerTask task, Worker worker) {
+        return true;
+    }
+
+    public boolean updateRunnerTaskCluster(RunnerTask task, Cluster cluster) {
+        return true;
+    }
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
index c00a694..b507128 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
@@ -57,9 +57,8 @@ public class WatchWorker {
                         if (!workerService.isFinalState(worker)) {
                             Map<String, Object> environments = new 
Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() {
                             }.getType());
-//                            WorkerResource workerResource = new 
Gson().fromJson(worker.getResources(), WorkerResource.class);
                             log.info("applyWorkerInstance, workerName: {}, 
workerImageTag: {}, workerResource: {}, environments: {}", worker.getName(), 
worker.getImage(), worker.getResources(), new Gson().toJson(environments));
-//                        
workerInstanceRepository.applyWorkerInstance(worker.getName(), 
worker.getImageTag(), workerResource, environments);
+//                            
workerInstanceRepository.applyWorkerInstance(worker.getName(), 
worker.getImage(), new Gson().fromJson(worker.getResources(), 
WorkerResource.class), environments);
                             workerService.refreshMD5(worker);
                         }
 
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
similarity index 79%
copy from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
copy to 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
index 67254db..47f8e62 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java
@@ -15,14 +15,15 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
 
-import java.util.List;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import org.springframework.stereotype.Service;
 
-public class RunnerTaskService {
+@Service
+public class WorkerLoadService {
 
-    List<RunnerTask> listRunnerTask(String runnerName) {
+    public Worker getMinLoadWorker(Cluster cluster) {
         return null;
     }
-
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
similarity index 55%
copy from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
copy to 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
index 605d38f..abba9d1 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java
@@ -15,18 +15,26 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
 
-import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
+import javax.annotation.Resource;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterService;
+import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask;
+import org.springframework.stereotype.Service;
 
-public class RunnerTaskDispatcher {
+@Service
+public class WorkerSelectorService {
 
-    public boolean dispatchToCluster(Worker worker) {
-        return false;
-    }
+    @Resource
+    WorkerLoadService workerLoadService;
 
-    public boolean dispatchToWorker(Worker worker) {
-        return false;
-    }
+    @Resource
+    ClusterService clusterService;
 
+    public Worker selectWorker(RunnerTask runnerTask) {
+        Cluster cluster = clusterService.getCluster(runnerTask.getClusterId());
+        Worker worker = workerLoadService.getMinLoadWorker(cluster);
+        return worker;
+    }
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
new file mode 100644
index 0000000..d96438b
--- /dev/null
+++ 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import java.util.Map;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource;
+import org.junit.Ignore;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+
+
+public class WorkerInstanceRepositoryOnK8STest {
+
+    @InjectMocks
+    private WorkerInstanceRepositoryOnK8S workerInstanceRepositoryOnK8S;
+
+    @Test
+    @Ignore
+    void applyWorkerInstance() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
+        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", 
"registry.cn-beijing.cr.aliyuncs.com/eventbridge:20231115195431f55971", new 
Gson().fromJson("{\"cpu\":1,\"memory\":1}", WorkerResource.class), null);
+    }
+
+    @Test
+    @Ignore
+    void deleteWorkerInstance() {
+        workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4");
+    }
+
+    @Test
+    @Ignore
+    void getWorkerInstanceStatus() {
+        workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4");
+    }
+
+    @Test
+    @Ignore
+    void applyWorkerInstanceConfigFile() {
+        String taskConfig = "[\n" +
+            "  {\n" +
+            "    \"name\":\"demo-runner\",\n" +
+            "    \"components\":[\n" +
+            "      {\n" +
+            "        \"accountId\": \"654321\",\n" +
+            "        \"eventBusName\":\"demo-bus\"\n" +
+            "      },\n" +
+            "      {\n" +
+            "        \"filterPattern\":\"{}\",\n" +
+            "        
\"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n"
 +
+            "      },\n" +
+            "      {\n" +
+            "        
\"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n"
 +
+            "        \"class\": 
\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" +
+            "      },\n" +
+            "      {\n" +
+            "        
\"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" +
+            "        \"webHook\":\"xxxxxxxxxxx\",\n" +
+            "        \"secretKey\":\"xxxxxxxxxxx\"\n" +
+            "      }\n" +
+            "    ]\n" +
+            "  }\n" +
+            "]";
+        
workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-4", 
"/eventbridge/task-config", taskConfig);
+    }
+
+    @Test
+    @Ignore
+    void getWorkerInstanceConfigFile() {
+        workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", 
"/eventbridge/task-config");
+    }
+}
\ No newline at end of file
diff --git a/rocketmq_eventbridge.mv.db b/rocketmq_eventbridge.mv.db
index 1a098df..2e5c25d 100644
Binary files a/rocketmq_eventbridge.mv.db and b/rocketmq_eventbridge.mv.db 
differ

Reply via email to