This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit c00b3e28de4e97609fbaeea82614e3069a8dc602 Author: chengxy <[email protected]> AuthorDate: Thu May 25 09:43:25 2023 +0800 refactor shutdown --- .../eventbridge/adapter/runtime/boot/EventRuleTransfer.java | 11 ++--------- .../eventbridge/adapter/runtime/boot/EventTargetTrigger.java | 5 ----- .../adapter/runtime/boot/common/CirculatorContext.java | 4 +++- .../eventbridge/adapter/runtime/utils/ShutdownUtils.java | 9 --------- 4 files changed, 5 insertions(+), 24 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 4322187..91720bd 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -48,8 +48,6 @@ public class EventRuleTransfer extends ServiceThread { private final CirculatorContext circulatorContext; private final OffsetManager offsetManager; private final ErrorHandler errorHandler; - private Map<String, TransformEngine<ConnectRecord>> latestTransformMap; - private List<CompletableFuture<Void>> completableFutures; public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager, ErrorHandler errorHandler) { @@ -77,7 +75,7 @@ public class EventRuleTransfer extends ServiceThread { this.waitForRunning(1000); continue; } - latestTransformMap = circulatorContext.getTaskTransformMap(); + Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap(); if (MapUtils.isEmpty(latestTransformMap)) { logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(3000); @@ -85,7 +83,7 @@ public class EventRuleTransfer extends ServiceThread { } List<ConnectRecord> afterTransformConnect = Lists.newArrayList(); - completableFutures = Lists.newArrayList(); + List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); for(String runnerName: eventRecordMap.keySet()){ TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName); List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName); @@ -127,11 +125,6 @@ public class EventRuleTransfer extends ServiceThread { @Override public void shutdown() { try { - for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : latestTransformMap.entrySet()) { - TransformEngine<ConnectRecord> transformEngine = taskTransform.getValue(); - transformEngine.close(); - } - ShutdownUtils.completedFuture(completableFutures); circulatorContext.releaseTaskTransform(); } catch (Exception e) { e.printStackTrace(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index c4ff349..08b1383 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -96,11 +96,6 @@ public class EventTargetTrigger extends ServiceThread { @Override public void shutdown() { - Map<String, SinkTask> sinkTaskMap = circulatorContext.getPusherTaskMap(); - for (Map.Entry<String, SinkTask> item : sinkTaskMap.entrySet()) { - SinkTask sinkTask = item.getValue(); - sinkTask.stop(); - } try { circulatorContext.releaseExecutorService(); circulatorContext.releaseTriggerTask(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java index 7b467d8..2763be3 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java @@ -293,9 +293,11 @@ public class CirculatorContext implements TargetRunnerListener { } - public void releaseTaskTransform() { + public void releaseTaskTransform() throws Exception { for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : taskTransformMap.entrySet()) { String runnerName = taskTransform.getKey(); + TransformEngine<ConnectRecord> transformEngine = taskTransform.getValue(); + transformEngine.close(); taskTransformMap.remove(runnerName); } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java index 7feb906..cf8679d 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java @@ -42,13 +42,4 @@ public class ShutdownUtils { } } } - - /** - * set final value - * @param completableFutures - */ - public static void completedFuture(List<CompletableFuture<Void>> completableFutures){ - for (CompletableFuture<Void> future: completableFutures) { - future.cancel(true); - } } }
