This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit f12cc0449ef525bbd6215951bc35756b8a513dd6 Author: 2011shenlin <[email protected]> AuthorDate: Sun Apr 23 19:47:27 2023 +0800 feat:support record commit. --- .../eventbridge/adapter/runtimer/Runtimer.java | 15 ++++++--- .../adapter/runtimer/boot/EventBusListener.java | 18 ++++++---- .../adapter/runtimer/boot/EventRuleTransfer.java | 14 +++++--- .../adapter/runtimer/boot/EventTargetPusher.java | 6 +++- .../runtimer/boot/listener/EventSubscriber.java | 3 +- .../adapter/runtimer/boot/pusher/PushCallback.java | 39 ---------------------- .../adapter/runtimer/boot/pusher/PushRequest.java | 34 ------------------- .../adapter/runtimer/error/ErrorHandler.java | 7 +++- 8 files changed, 44 insertions(+), 92 deletions(-) diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java index 495aeee..0cf382b 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState; +import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,24 +45,28 @@ public class Runtimer { private AtomicReference<RuntimerState> runtimerState; + @Autowired private CirculatorContext circulatorContext; - @Autowired private TargetRunnerConfigObserver runnerConfigObserver; @Autowired private OffsetManager offsetManager; @Autowired private EventSubscriber eventSubscriber; + @Autowired + private ErrorHandler errorHandler; public Runtimer( CirculatorContext circulatorContext, TargetRunnerConfigObserver runnerConfigObserver, OffsetManager offsetManager, - EventSubscriber eventSubscriber) { + EventSubscriber eventSubscriber, + ErrorHandler errorHandler) { this.circulatorContext = circulatorContext; this.runnerConfigObserver = runnerConfigObserver; this.offsetManager = offsetManager; this.eventSubscriber = eventSubscriber; + this.errorHandler = errorHandler; } @PostConstruct @@ -70,9 +75,9 @@ public class Runtimer { circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig()); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); - new EventBusListener(circulatorContext, eventSubscriber).start(); - new EventRuleTransfer(circulatorContext, offsetManager).start(); - new EventTargetPusher(circulatorContext, offsetManager).start(); + new EventBusListener(circulatorContext, eventSubscriber, errorHandler).start(); + new EventRuleTransfer(circulatorContext, offsetManager, errorHandler).start(); + new EventTargetPusher(circulatorContext, offsetManager, errorHandler).start(); startRuntimer(); } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java index 7a34aa3..d735e9a 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java @@ -18,15 +18,15 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; import org.apache.commons.collections.CollectionUtils; -import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * listen the event and offer to queue * @@ -40,22 +40,26 @@ public class EventBusListener extends ServiceThread { private final EventSubscriber eventSubscriber; - public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber) { + private final ErrorHandler errorHandler; + + public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber, + ErrorHandler errorHandler) { this.circulatorContext = circulatorContext; this.eventSubscriber = eventSubscriber; + this.errorHandler = errorHandler; } @Override public void run() { while (!stopped) { - try{ + try { List<ConnectRecord> recordList = eventSubscriber.pull(); - if(CollectionUtils.isEmpty(recordList)){ + if (CollectionUtils.isEmpty(recordList)) { this.waitForRunning(1000); continue; } circulatorContext.offerEventRecords(recordList); - }catch (Exception exception) { + } catch (Exception exception) { logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java index 06f1ce3..a094d07 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java @@ -33,10 +33,10 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.Circulator import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; +import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; /** * receive event and transfer the rule to pusher @@ -49,10 +49,14 @@ public class EventRuleTransfer extends ServiceThread { private final CirculatorContext circulatorContext; @Autowired private final OffsetManager offsetManager; + @Autowired + private final ErrorHandler errorHandler; - public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager) { + public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager, + ErrorHandler errorHandler) { this.circulatorContext = circulatorContext; this.offsetManager = offsetManager; + this.errorHandler = errorHandler; } @Override @@ -61,9 +65,10 @@ public class EventRuleTransfer extends ServiceThread { } @PostConstruct - public void init(){ + public void init() { super.start(); } + @Override public void run() { while (!stopped) { @@ -96,6 +101,7 @@ public class EventRuleTransfer extends ServiceThread { CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> transfer.doTransforms(eventRecord)) .exceptionally((exception) -> { logger.error("transfer do transform event record failed,stackTrace-", exception); + errorHandler.handle(eventRecord,exception); return null; }) .thenAccept(record -> { @@ -105,7 +111,7 @@ public class EventRuleTransfer extends ServiceThread { record.getExtensions().put(runnerNameKey, transfer.getConnectConfig(runnerNameKey)); record.getExtensions().put(taskClassKey, transfer.getConnectConfig(taskClassKey)); afterTransformConnect.add(record); - }else{ + } else { offsetManager.commit(eventRecord); } }); diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java index e420ee2..b3216da 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; +import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +41,13 @@ public class EventTargetPusher extends ServiceThread { private final CirculatorContext circulatorContext; private final OffsetManager offsetManager; + private final ErrorHandler errorHandler; - public EventTargetPusher(CirculatorContext circulatorContext, OffsetManager offsetManager) { + public EventTargetPusher(CirculatorContext circulatorContext, OffsetManager offsetManager, + ErrorHandler errorHandler) { this.circulatorContext = circulatorContext; this.offsetManager = offsetManager; + this.errorHandler = errorHandler; } @Override diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java index 39d08d7..0b33b9e 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java @@ -52,8 +52,9 @@ public abstract class EventSubscriber implements TargetRunnerListener { * @param connectRecord * @param delaySec */ - public void put(String eventBusName, ConnectRecord connectRecord, int delaySec){ + public boolean put(String eventBusName, ConnectRecord connectRecord, int delaySec){ // convert the eventBusName to Topic ? + return true; } /** diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java deleted file mode 100644 index 8609cad..0000000 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher; - -import com.google.common.collect.Lists; -import io.openmessaging.connector.api.data.ConnectRecord; -import java.util.List; -import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; -import org.springframework.beans.factory.annotation.Autowired; - -public class PushCallback { - - @Autowired - EventSubscriber eventSubscriber; - - public void completed(ConnectRecord connectRecord) { - eventSubscriber.commit(Lists.newArrayList(connectRecord)); - } - - public void completed(List<ConnectRecord> connectRecords) { - eventSubscriber.commit(connectRecords); - } - -} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java deleted file mode 100644 index 9223a5f..0000000 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher; - -import io.openmessaging.connector.api.data.ConnectRecord; -import java.util.List; -import lombok.Data; - -@Data -public class PushRequest { - - private String targetRunnerName; - - private String targetClass; - - private List<ConnectRecord> connectRecords; - - private PushCallback pushCallback; -} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java index 16bdbb9..9aac1d5 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.error; import com.google.common.base.Strings; import io.openmessaging.connector.api.data.ConnectRecord; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; @@ -37,16 +38,21 @@ public class ErrorHandler { @Autowired EventSubscriber eventSubscriber; + @Autowired + OffsetManager offsetManager; + public void handle(ConnectRecord connectRecord, Throwable t) { String eventRunnerName = connectRecord.getExtension(RUNNER_NAME); TargetRunnerConfig targetRunnerConfig = TargetRunnerContext.getTargetRunnerConfig(eventRunnerName); String eventBusName = targetRunnerConfig.getEventBusName(); PushRetryStrategyEnum pushRetryStrategyEnum = PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy()); + int retryTimes = parseRetryTimes(connectRecord); int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum); if (delaySec > 0) { eventSubscriber.put(eventBusName, connectRecord, delaySec); } + offsetManager.commit(connectRecord); } private int parseRetryTimes(ConnectRecord connectRecord) { @@ -71,7 +77,6 @@ public class ErrorHandler { * @return */ private int calcDelaySec(int retryTimes, PushRetryStrategyEnum pushRetryStrategyEnum) { - switch (pushRetryStrategyEnum) { case BACKOFF_RETRY: if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {
