This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/feat/runtime-manager by this
push:
new 4aa0a06 feat:apply the WorkerInstanceRepositoryOnK8S api.
4aa0a06 is described below
commit 4aa0a06a6098c112297d62c9012a608bf0790f7c
Author: 2011shenlin <[email protected]>
AuthorDate: Sun Nov 5 14:28:32 2023 +0800
feat:apply the WorkerInstanceRepositoryOnK8S api.
---
.../repository/WorkerInstanceRepository.java | 46 +++++++++++++++-
.../adapter/runtime/manager/watch/WatchWorker.java | 28 ++++------
.../runtime/manager/worker/WorkerStatusEnum.java | 60 +++++++++++++++++++++
.../repository/WorkerInstanceRepositoryOnK8S.java | 17 +++++-
rocketmq_eventbridge.mv.db | Bin 81920 -> 81920 bytes
5 files changed, 130 insertions(+), 21 deletions(-)
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
index 553f93c..a1fd579 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
@@ -17,10 +17,54 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.repository;
+import java.util.Map;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum;
+
public interface WorkerInstanceRepository {
- boolean applyWorkerInstance(String name, String image, String resources,
String config);
+ /**
+ * Apply(Create/Update) the worker instance.
+ *
+ * @param name
+ * @param image
+ * @param resources
+ * @param environments
+ * @return
+ */
+ boolean applyWorkerInstance(String name, String image, String resources,
Map<String, Object> environments);
+ /**
+ * Delete the worker
+ *
+ * @param name
+ * @return
+ */
boolean deleteWorkerInstance(String name);
+ /**
+ * Get the status of worker
+ *
+ * @param name
+ * @return
+ */
+ WorkerStatusEnum getWorkerInstanceStatus(String name);
+
+ /**
+ * Apply(Create/Update) the config to the worker instance. It may contains
more than one config in worker instance.
+ *
+ * @param name
+ * @param filePath
+ * @param config
+ * @return
+ */
+ boolean applyWorkerInstanceConfigFile(String name, String filePath, String
config);
+
+ /**
+ * Get the config to the worker instance.
+ *
+ * @param name
+ * @param filePath
+ * @return
+ */
+ boolean getWorkerInstanceConfigFile(String name, String filePath);
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
index d723ca5..a2e26de 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
@@ -17,20 +17,25 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.watch;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.repository.WorkerInstanceRepository;
import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.Worker;
import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerService;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.transaction.annotation.Transactional;
public class WatchWorker {
@Autowired
WorkerService workerService;
+ @Autowired
+ WorkerInstanceRepository workerInstanceRepository;
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl(WatchWorker.class.getSimpleName()));
@@ -42,9 +47,9 @@ public class WatchWorker {
List<Worker> workers = workerService.listWorkers();
workers.forEach(worker -> {
if (!workerService.isFinalState(worker)) {
- watchTheWorkerImageTag(worker);
- watchTheWorkerResources(worker);
- watchTheWorkerConfig(worker);
+ Map<String, Object> environments = new
Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() {
+ }.getType());
+
workerInstanceRepository.applyWorkerInstance(worker.getName(),
worker.getImageTag(), worker.getResources(), environments);
workerService.refreshMD5(worker);
}
@@ -52,19 +57,4 @@ public class WatchWorker {
}
}, 3, 60, TimeUnit.SECONDS);
}
-
- private void watchTheWorkerConfig(Worker worker) {
-
- }
-
- @Transactional
- private void watchTheWorkerImageTag(Worker worker) {
-
- }
-
- @Transactional
- private void watchTheWorkerResources(Worker worker) {
-
- }
-
}
\ No newline at end of file
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
new file mode 100644
index 0000000..1cc34aa
--- /dev/null
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
+
+public enum WorkerStatusEnum {
+ UNKNOWN(-1, "Unknown"),
+ STARTING(3, "Starting"),
+ RUN(5, "Run"),
+ STOP(10, "Stop"),
+ RELEASING(11, "Releasing");
+
+ private int value;
+ private String desc;
+
+ WorkerStatusEnum(int value, String desc) {
+ this.value = value;
+ this.desc = desc;
+ }
+
+ public static WorkerStatusEnum valueOf(int value) {
+ for (WorkerStatusEnum temp : WorkerStatusEnum.values()) {
+ if (temp.getValue() == value) {
+ return temp;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public static WorkerStatusEnum nameOf(String name) {
+ for (WorkerStatusEnum temp : WorkerStatusEnum.values()) {
+ if (temp.name().equals(name)) {
+ return temp;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public int getValue() {
+ return value;
+ }
+}
diff --git
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
index 7f548f3..0299ddb 100644
---
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
+++
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
@@ -17,12 +17,15 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository;
+import java.util.Map;
import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.repository.WorkerInstanceRepository;
+import
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum;
public class WorkerInstanceRepositoryOnK8S implements WorkerInstanceRepository
{
+
@Override
- public boolean applyWorkerInstance(String name, String image, String
resources, String config) {
+ public boolean applyWorkerInstance(String name, String image, String
resources, Map<String, Object> environments) {
return false;
}
@@ -31,4 +34,16 @@ public class WorkerInstanceRepositoryOnK8S implements
WorkerInstanceRepository {
return false;
}
+ @Override public WorkerStatusEnum getWorkerInstanceStatus(String name) {
+ return null;
+ }
+
+ @Override public boolean applyWorkerInstanceConfigFile(String name, String
filePath, String config) {
+ return false;
+ }
+
+ @Override public boolean getWorkerInstanceConfigFile(String name, String
filePath) {
+ return false;
+ }
+
}
\ No newline at end of file
diff --git a/rocketmq_eventbridge.mv.db b/rocketmq_eventbridge.mv.db
index 69f4bcd..e5daf95 100644
Binary files a/rocketmq_eventbridge.mv.db and b/rocketmq_eventbridge.mv.db
differ