This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit aab58d6cb8811710351a3867e99cb2d98c0d0a52 Author: windwheel <[email protected]> AuthorDate: Sun May 21 16:09:02 2023 +0800 refactor shutdown --- .../eventbridge/adapter/runtime/Runtime.java | 39 +++++++++++++++++++--- .../adapter/runtime/boot/EventBusListener.java | 5 +++ .../adapter/runtime/boot/EventRuleTransfer.java | 27 +++++++++++++-- .../adapter/runtime/boot/EventTargetTrigger.java | 19 +++++++++++ .../runtime/boot/common/CirculatorContext.java | 25 ++++++++++++++ .../boot/hook/AbstractStartAndShutdown.java | 2 +- .../adapter/runtime}/boot/hook/Shutdown.java | 2 +- .../adapter/runtime}/boot/hook/Start.java | 2 +- .../runtime}/boot/hook/StartAndShutdown.java | 4 +-- .../runtime/boot/listener/EventSubscriber.java | 5 +++ .../adapter/runtime/common/ServiceThread.java | 12 ++++++- .../adapter/runtime}/utils/ShutdownUtils.java | 2 +- .../rocketmq/runtimer/RocketMQEventSubscriber.java | 8 +++++ 13 files changed, 137 insertions(+), 15 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java index e5bd487..aae2f70 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java @@ -31,6 +31,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown; + + import javax.annotation.PostConstruct; import java.util.concurrent.atomic.AtomicReference; @@ -46,6 +50,8 @@ public class Runtime { private AtomicReference<RuntimeState> runtimerState; + private static final RuntimeStartAndShutdown RUNTIME_START_AND_SHUTDOWN = new RuntimeStartAndShutdown(); + @Autowired private CirculatorContext circulatorContext; @Autowired @@ -58,17 +64,40 @@ public class Runtime { private ErrorHandler errorHandler; @PostConstruct - public void initAndStart() { - logger.info("Start init runtime."); + public void initAndStart() throws Exception { + logger.info("Start init runtimer."); circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig()); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); - new EventBusListener(circulatorContext, eventSubscriber, errorHandler).start(); - new EventRuleTransfer(circulatorContext, offsetManager, errorHandler).start(); - new EventTargetTrigger(circulatorContext, offsetManager, errorHandler).start(); + EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler); + EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler); + EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler); + RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener); + RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer); + RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher); + + // start servers one by one. + RUNTIME_START_AND_SHUTDOWN.start(); + + java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("try to shutdown server"); + try { + RUNTIME_START_AND_SHUTDOWN.shutdown(); + } catch (Exception e) { + logger.error("err when shutdown rocketmq-proxy", e); + } + })); + startRuntimer(); } + private static class RuntimeStartAndShutdown extends AbstractStartAndShutdown { + @Override + protected void appendStartAndShutdown(StartAndShutdown startAndShutdown) { + super.appendStartAndShutdown(startAndShutdown); + } + } + public void startRuntimer() { runtimerState = new AtomicReference<>(RuntimeState.START); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index e6f06c2..48af27f 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -70,4 +70,9 @@ public class EventBusListener extends ServiceThread { public String getServiceName() { return EventBusListener.class.getSimpleName(); } + + @Override + public void shutdown() { + eventSubscriber.close(); + } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 8612733..4322187 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -32,9 +32,9 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorCon import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; +import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; /** * receive event and transfer the rule to pusher @@ -48,6 +48,8 @@ public class EventRuleTransfer extends ServiceThread { private final CirculatorContext circulatorContext; private final OffsetManager offsetManager; private final ErrorHandler errorHandler; + private Map<String, TransformEngine<ConnectRecord>> latestTransformMap; + private List<CompletableFuture<Void>> completableFutures; public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager, ErrorHandler errorHandler) { @@ -75,7 +77,7 @@ public class EventRuleTransfer extends ServiceThread { this.waitForRunning(1000); continue; } - Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap(); + latestTransformMap = circulatorContext.getTaskTransformMap(); if (MapUtils.isEmpty(latestTransformMap)) { logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(3000); @@ -83,7 +85,7 @@ public class EventRuleTransfer extends ServiceThread { } List<ConnectRecord> afterTransformConnect = Lists.newArrayList(); - List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); + completableFutures = Lists.newArrayList(); for(String runnerName: eventRecordMap.keySet()){ TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName); List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName); @@ -117,4 +119,23 @@ public class EventRuleTransfer extends ServiceThread { } } + @Override + public void start() { + thread.start(); + } + + @Override + public void shutdown() { + try { + for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : latestTransformMap.entrySet()) { + TransformEngine<ConnectRecord> transformEngine = taskTransform.getValue(); + transformEngine.close(); + } + ShutdownUtils.completedFuture(completableFutures); + circulatorContext.releaseTaskTransform(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index a54ee39..c4ff349 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -89,4 +89,23 @@ public class EventTargetTrigger extends ServiceThread { return EventTargetTrigger.class.getSimpleName(); } + @Override + public void start() { + thread.start(); + } + + @Override + public void shutdown() { + Map<String, SinkTask> sinkTaskMap = circulatorContext.getPusherTaskMap(); + for (Map.Entry<String, SinkTask> item : sinkTaskMap.entrySet()) { + SinkTask sinkTask = item.getValue(); + sinkTask.stop(); + } + try { + circulatorContext.releaseExecutorService(); + circulatorContext.releaseTriggerTask(); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java index 203f43a..7b467d8 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.enums.RefreshTypeE import org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.Plugin; import org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.PluginClassLoader; import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine; +import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -291,4 +292,28 @@ public class CirculatorContext implements TargetRunnerListener { return null; } + + public void releaseTaskTransform() { + for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : taskTransformMap.entrySet()) { + String runnerName = taskTransform.getKey(); + taskTransformMap.remove(runnerName); + } + } + + public void releaseTriggerTask() { + for (Map.Entry<String, SinkTask> triggerTask: pusherTaskMap.entrySet()) { + SinkTask sinkTask = triggerTask.getValue(); + String runnerName = triggerTask.getKey(); + sinkTask.stop(); + pusherTaskMap.remove(runnerName); + } + } + + public void releaseExecutorService() throws Exception { + for (Map.Entry<String, ExecutorService> pusherExecutor: pusherExecutorMap.entrySet()) { + ExecutorService pusher = pusherExecutor.getValue(); + ShutdownUtils.shutdownThreadPool(pusher); + } + } + } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java similarity index 97% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java index 051fe5a..9a1f33e 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook; +package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java similarity index 92% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java index f3ac5f3..854cd19 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook; +package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook; public interface Shutdown { void shutdown() throws Exception; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java similarity index 92% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java index b44d86a..353255f 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook; +package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook; public interface Start { void start() throws Exception; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java similarity index 86% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java index 242c1b0..cc80740 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook; +package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook; -public interface StartAndShutdown extends Start,Shutdown { +public interface StartAndShutdown extends Start, Shutdown { } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java index 0d99fce..be4db9b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java @@ -48,6 +48,11 @@ public abstract class EventSubscriber implements TargetRunnerListener { */ public abstract void commit(List<ConnectRecord> connectRecordList); + /** + * close resource such as consumer + */ + public abstract void close(); + /** * Put the connect record to the eventbus. * @param eventBusName diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java index 48835c1..400cf47 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.common; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown; import org.apache.rocketmq.common.CountDownLatch2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -public abstract class ServiceThread implements Runnable { +public abstract class ServiceThread extends AbstractStartAndShutdown implements Runnable { private static final Logger logger = LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER); @@ -34,6 +35,8 @@ public abstract class ServiceThread implements Runnable { protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); protected volatile boolean stopped = false; + protected boolean isDaemon = false; + public ServiceThread() { this.thread = new Thread(this, this.getServiceName()); @@ -42,7 +45,14 @@ public abstract class ServiceThread implements Runnable { public abstract String getServiceName(); public void start() { + logger.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); + if (!hasNotified.compareAndSet(false, true)) { + return; + } + stopped = false; + this.thread.setDaemon(isDaemon); this.thread.start(); + logger.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), hasNotified.get(), thread); } public void shutdown() { diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java similarity index 96% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java rename to adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java index 980c242..7feb906 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.utils; +package org.apache.rocketmq.eventbridge.adapter.runtime.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index f6b08cd..3005486 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -158,6 +158,14 @@ public class RocketMQEventSubscriber extends EventSubscriber { consumeWorkerMap.get(runnerName).commit(msgIds); } + @Override + public void close() { + for (Map.Entry<String, ConsumeWorker> item : consumeWorkerMap.entrySet()) { + ConsumeWorker consumeWorker = item.getValue(); + consumeWorker.shutdown(); + } + } + /** * init rocketmq ref config */
