This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/runtime-manager in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/feat/runtime-manager by this push: new 2da769b feat:add test case. 2da769b is described below commit 2da769bb2a91b0bea5726ba43fbe5620e1498d1c Author: 2011shenlin <2011shen...@gmail.com> AuthorDate: Wed Jan 17 16:39:54 2024 +0800 feat:add test case. --- .../ClusterSelectorService.java} | 14 ++-- .../runtime/manager/cluster/ClusterService.java | 3 +- .../manager/dispatch/RunnerTaskDispatcher.java | 31 +++++++-- ...nnerTaskRebalance.java => WorkerRebalance.java} | 2 +- .../{ClusterWorkerScale.java => ClusterScale.java} | 4 +- .../runtime/manager/task/RunnerTaskService.java | 9 +++ .../adapter/runtime/manager/watch/WatchWorker.java | 3 +- .../WorkerLoadService.java} | 11 ++-- .../WorkerSelectorService.java} | 26 +++++--- .../WorkerInstanceRepositoryOnK8STest.java | 73 +++++++++++++++++++++ rocketmq_eventbridge.mv.db | Bin 81920 -> 77824 bytes 11 files changed, 147 insertions(+), 29 deletions(-) diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java similarity index 73% copy from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java copy to adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java index 67254db..6ba2766 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterSelectorService.java @@ -15,14 +15,18 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task; +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster; -import java.util.List; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask; +import org.springframework.stereotype.Service; -public class RunnerTaskService { +@Service +public class ClusterSelectorService { + public Cluster selectCluster(RunnerTask runnerTask) { + return selectDefaultCluster(); + } - List<RunnerTask> listRunnerTask(String runnerName) { + public Cluster selectDefaultCluster() { return null; } - } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java index 7ac59ca..ef27461 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/cluster/ClusterService.java @@ -38,10 +38,11 @@ public class ClusterService { return true; } - public Cluster getCluster(String clusterName) { + public Cluster getCluster(long clusterId) { return null; } + public List<Cluster> listCluster() { return clusterRepository.listCluster(); } diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java index 605d38f..696265b 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java @@ -17,16 +17,39 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch; +import javax.annotation.Resource; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterSelectorService; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTaskService; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerSelectorService; public class RunnerTaskDispatcher { - public boolean dispatchToCluster(Worker worker) { - return false; + @Resource + ClusterSelectorService clusterSelectorService; + + @Resource + WorkerSelectorService workerSelectorService; + + @Resource + RunnerTaskService runnerTaskService; + + public boolean dispatchRunnerTask(RunnerTask runnerTask) { + boolean dispatchToCluster = dispatchToCluster(runnerTask); + boolean dispatchToWorker = dispatchToWorker(runnerTask); + return dispatchToCluster && dispatchToWorker; + } + + public boolean dispatchToCluster(RunnerTask runnerTask) { + Cluster cluster = clusterSelectorService.selectCluster(runnerTask); + return runnerTaskService.updateRunnerTaskCluster(runnerTask, cluster); } - public boolean dispatchToWorker(Worker worker) { - return false; + public boolean dispatchToWorker(RunnerTask runnerTask) { + Worker worker = workerSelectorService.selectWorker(runnerTask); + return runnerTaskService.updateRunnerTaskWorker(runnerTask, worker); } } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java similarity index 96% rename from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java rename to adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java index 5abdb47..052bb82 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskRebalance.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/WorkerRebalance.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch; import org.springframework.beans.factory.annotation.Autowired; -public class RunnerTaskRebalance { +public class WorkerRebalance { @Autowired private RunnerTaskDispatcher runnerTaskDispatcher; diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java similarity index 95% rename from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java rename to adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java index be22fa0..866695b 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterWorkerScale.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/scale/ClusterScale.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service -public class ClusterWorkerScale { +public class ClusterScale { @Autowired ClusterService clusterService; @@ -37,7 +37,7 @@ public class ClusterWorkerScale { private int DEFAULT_SCALE_DOWN_TRIGGER_LOAD = 20; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryImpl(ClusterWorkerScale.class.getSimpleName())); + new ThreadFactoryImpl(ClusterScale.class.getSimpleName())); public void start() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java index 67254db..90df069 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task; import java.util.List; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker; public class RunnerTaskService { @@ -25,4 +27,11 @@ public class RunnerTaskService { return null; } + public boolean updateRunnerTaskWorker(RunnerTask task, Worker worker) { + return true; + } + + public boolean updateRunnerTaskCluster(RunnerTask task, Cluster cluster) { + return true; + } } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java index c00a694..b507128 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java @@ -57,9 +57,8 @@ public class WatchWorker { if (!workerService.isFinalState(worker)) { Map<String, Object> environments = new Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() { }.getType()); -// WorkerResource workerResource = new Gson().fromJson(worker.getResources(), WorkerResource.class); log.info("applyWorkerInstance, workerName: {}, workerImageTag: {}, workerResource: {}, environments: {}", worker.getName(), worker.getImage(), worker.getResources(), new Gson().toJson(environments)); -// workerInstanceRepository.applyWorkerInstance(worker.getName(), worker.getImageTag(), workerResource, environments); +// workerInstanceRepository.applyWorkerInstance(worker.getName(), worker.getImage(), new Gson().fromJson(worker.getResources(), WorkerResource.class), environments); workerService.refreshMD5(worker); } diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java similarity index 79% copy from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java copy to adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java index 67254db..47f8e62 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/task/RunnerTaskService.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerLoadService.java @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtime.manager.task; +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker; -import java.util.List; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster; +import org.springframework.stereotype.Service; -public class RunnerTaskService { +@Service +public class WorkerLoadService { - List<RunnerTask> listRunnerTask(String runnerName) { + public Worker getMinLoadWorker(Cluster cluster) { return null; } - } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java similarity index 55% copy from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java copy to adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java index 605d38f..abba9d1 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/dispatch/RunnerTaskDispatcher.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerSelectorService.java @@ -15,18 +15,26 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtime.manager.dispatch; +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker; -import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker; +import javax.annotation.Resource; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.Cluster; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.cluster.ClusterService; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.task.RunnerTask; +import org.springframework.stereotype.Service; -public class RunnerTaskDispatcher { +@Service +public class WorkerSelectorService { - public boolean dispatchToCluster(Worker worker) { - return false; - } + @Resource + WorkerLoadService workerLoadService; - public boolean dispatchToWorker(Worker worker) { - return false; - } + @Resource + ClusterService clusterService; + public Worker selectWorker(RunnerTask runnerTask) { + Cluster cluster = clusterService.getCluster(runnerTask.getClusterId()); + Worker worker = workerLoadService.getMinLoadWorker(cluster); + return worker; + } } \ No newline at end of file diff --git a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java new file mode 100644 index 0000000..d96438b --- /dev/null +++ b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java @@ -0,0 +1,73 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository; + +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import java.util.Map; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource; +import org.junit.Ignore; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; + + +public class WorkerInstanceRepositoryOnK8STest { + + @InjectMocks + private WorkerInstanceRepositoryOnK8S workerInstanceRepositoryOnK8S; + + @Test + @Ignore + void applyWorkerInstance() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); + workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", "registry.cn-beijing.cr.aliyuncs.com/eventbridge:20231115195431f55971", new Gson().fromJson("{\"cpu\":1,\"memory\":1}", WorkerResource.class), null); + } + + @Test + @Ignore + void deleteWorkerInstance() { + workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4"); + } + + @Test + @Ignore + void getWorkerInstanceStatus() { + workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4"); + } + + @Test + @Ignore + void applyWorkerInstanceConfigFile() { + String taskConfig = "[\n" + + " {\n" + + " \"name\":\"demo-runner\",\n" + + " \"components\":[\n" + + " {\n" + + " \"accountId\": \"654321\",\n" + + " \"eventBusName\":\"demo-bus\"\n" + + " },\n" + + " {\n" + + " \"filterPattern\":\"{}\",\n" + + " \"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n" + + " },\n" + + " {\n" + + " \"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n" + + " \"class\": \"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" + + " },\n" + + " {\n" + + " \"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" + + " \"webHook\":\"xxxxxxxxxxx\",\n" + + " \"secretKey\":\"xxxxxxxxxxx\"\n" + + " }\n" + + " ]\n" + + " }\n" + + "]"; + workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-4", "/eventbridge/task-config", taskConfig); + } + + @Test + @Ignore + void getWorkerInstanceConfigFile() { + workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", "/eventbridge/task-config"); + } +} \ No newline at end of file diff --git a/rocketmq_eventbridge.mv.db b/rocketmq_eventbridge.mv.db index 1a098df..2e5c25d 100644 Binary files a/rocketmq_eventbridge.mv.db and b/rocketmq_eventbridge.mv.db differ