This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit c00b3e28de4e97609fbaeea82614e3069a8dc602
Author: chengxy <[email protected]>
AuthorDate: Thu May 25 09:43:25 2023 +0800

    refactor shutdown
---
 .../eventbridge/adapter/runtime/boot/EventRuleTransfer.java   | 11 ++---------
 .../eventbridge/adapter/runtime/boot/EventTargetTrigger.java  |  5 -----
 .../adapter/runtime/boot/common/CirculatorContext.java        |  4 +++-
 .../eventbridge/adapter/runtime/utils/ShutdownUtils.java      |  9 ---------
 4 files changed, 5 insertions(+), 24 deletions(-)

diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index 4322187..91720bd 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -48,8 +48,6 @@ public class EventRuleTransfer extends ServiceThread {
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
     private final ErrorHandler errorHandler;
-    private Map<String, TransformEngine<ConnectRecord>> latestTransformMap;
-    private List<CompletableFuture<Void>> completableFutures;
 
     public EventRuleTransfer(CirculatorContext circulatorContext, 
OffsetManager offsetManager,
         ErrorHandler errorHandler) {
@@ -77,7 +75,7 @@ public class EventRuleTransfer extends ServiceThread {
                 this.waitForRunning(1000);
                 continue;
             }
-            latestTransformMap = circulatorContext.getTaskTransformMap();
+            Map<String, TransformEngine<ConnectRecord>> latestTransformMap = 
circulatorContext.getTaskTransformMap();
             if (MapUtils.isEmpty(latestTransformMap)) {
                 logger.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(3000);
@@ -85,7 +83,7 @@ public class EventRuleTransfer extends ServiceThread {
             }
 
             List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
-            completableFutures = Lists.newArrayList();
+            List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
             for(String runnerName: eventRecordMap.keySet()){
                 TransformEngine<ConnectRecord> curTransformEngine = 
latestTransformMap.get(runnerName);
                 List<ConnectRecord> curEventRecords = 
eventRecordMap.get(runnerName);
@@ -127,11 +125,6 @@ public class EventRuleTransfer extends ServiceThread {
     @Override
     public void shutdown() {
         try {
-            for (Map.Entry<String, TransformEngine<ConnectRecord>> 
taskTransform : latestTransformMap.entrySet()) {
-                TransformEngine<ConnectRecord> transformEngine = 
taskTransform.getValue();
-                transformEngine.close();
-            }
-            ShutdownUtils.completedFuture(completableFutures);
             circulatorContext.releaseTaskTransform();
         } catch (Exception e) {
             e.printStackTrace();
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index c4ff349..08b1383 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -96,11 +96,6 @@ public class EventTargetTrigger extends ServiceThread {
 
     @Override
     public void shutdown() {
-        Map<String, SinkTask> sinkTaskMap =  
circulatorContext.getPusherTaskMap();
-        for (Map.Entry<String, SinkTask> item : sinkTaskMap.entrySet()) {
-            SinkTask sinkTask = item.getValue();
-            sinkTask.stop();
-        }
         try {
             circulatorContext.releaseExecutorService();
             circulatorContext.releaseTriggerTask();
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 7b467d8..2763be3 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -293,9 +293,11 @@ public class CirculatorContext implements 
TargetRunnerListener {
     }
 
 
-    public void releaseTaskTransform() {
+    public void releaseTaskTransform() throws Exception {
         for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : 
taskTransformMap.entrySet()) {
             String runnerName = taskTransform.getKey();
+            TransformEngine<ConnectRecord> transformEngine = 
taskTransform.getValue();
+            transformEngine.close();
             taskTransformMap.remove(runnerName);
         }
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index 7feb906..cf8679d 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -42,13 +42,4 @@ public class ShutdownUtils {
             }
         }
     }
-
-    /**
-     * set final value
-     * @param completableFutures
-     */
-    public static void completedFuture(List<CompletableFuture<Void>> 
completableFutures){
-        for (CompletableFuture<Void> future: completableFutures) {
-            future.cancel(true);
-        } }
 }

Reply via email to