This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit f12cc0449ef525bbd6215951bc35756b8a513dd6
Author: 2011shenlin <[email protected]>
AuthorDate: Sun Apr 23 19:47:27 2023 +0800

    feat:support record commit.
---
 .../eventbridge/adapter/runtimer/Runtimer.java     | 15 ++++++---
 .../adapter/runtimer/boot/EventBusListener.java    | 18 ++++++----
 .../adapter/runtimer/boot/EventRuleTransfer.java   | 14 +++++---
 .../adapter/runtimer/boot/EventTargetPusher.java   |  6 +++-
 .../runtimer/boot/listener/EventSubscriber.java    |  3 +-
 .../adapter/runtimer/boot/pusher/PushCallback.java | 39 ----------------------
 .../adapter/runtimer/boot/pusher/PushRequest.java  | 34 -------------------
 .../adapter/runtimer/error/ErrorHandler.java       |  7 +++-
 8 files changed, 44 insertions(+), 92 deletions(-)

diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 495aeee..0cf382b 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -26,6 +26,7 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,24 +45,28 @@ public class Runtimer {
 
     private AtomicReference<RuntimerState> runtimerState;
 
+    @Autowired
     private CirculatorContext circulatorContext;
-
     @Autowired
     private TargetRunnerConfigObserver runnerConfigObserver;
     @Autowired
     private OffsetManager offsetManager;
     @Autowired
     private EventSubscriber eventSubscriber;
+    @Autowired
+    private ErrorHandler errorHandler;
 
     public Runtimer(
         CirculatorContext circulatorContext,
         TargetRunnerConfigObserver runnerConfigObserver,
         OffsetManager offsetManager,
-        EventSubscriber eventSubscriber) {
+        EventSubscriber eventSubscriber,
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.runnerConfigObserver = runnerConfigObserver;
         this.offsetManager = offsetManager;
         this.eventSubscriber = eventSubscriber;
+        this.errorHandler = errorHandler;
     }
 
     @PostConstruct
@@ -70,9 +75,9 @@ public class Runtimer {
         
circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
-        new EventBusListener(circulatorContext, eventSubscriber).start();
-        new EventRuleTransfer(circulatorContext, offsetManager).start();
-        new EventTargetPusher(circulatorContext, offsetManager).start();
+        new EventBusListener(circulatorContext, eventSubscriber, 
errorHandler).start();
+        new EventRuleTransfer(circulatorContext, offsetManager, 
errorHandler).start();
+        new EventTargetPusher(circulatorContext, offsetManager, 
errorHandler).start();
         startRuntimer();
     }
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index 7a34aa3..d735e9a 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -18,15 +18,15 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
 import org.apache.commons.collections.CollectionUtils;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
  * listen the event and offer to queue
  *
@@ -40,22 +40,26 @@ public class EventBusListener extends ServiceThread {
 
     private final EventSubscriber eventSubscriber;
 
-    public EventBusListener(CirculatorContext circulatorContext, 
EventSubscriber eventSubscriber) {
+    private final ErrorHandler errorHandler;
+
+    public EventBusListener(CirculatorContext circulatorContext, 
EventSubscriber eventSubscriber,
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.eventSubscriber = eventSubscriber;
+        this.errorHandler = errorHandler;
     }
 
     @Override
     public void run() {
         while (!stopped) {
-            try{
+            try {
                 List<ConnectRecord> recordList = eventSubscriber.pull();
-                if(CollectionUtils.isEmpty(recordList)){
+                if (CollectionUtils.isEmpty(recordList)) {
                     this.waitForRunning(1000);
                     continue;
                 }
                 circulatorContext.offerEventRecords(recordList);
-            }catch (Exception exception) {
+            } catch (Exception exception) {
                 logger.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
             }
         }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index 06f1ce3..a094d07 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -33,10 +33,10 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.Circulator
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 /**
  * receive event and transfer the rule to pusher
@@ -49,10 +49,14 @@ public class EventRuleTransfer extends ServiceThread {
     private final CirculatorContext circulatorContext;
     @Autowired
     private final OffsetManager offsetManager;
+    @Autowired
+    private final ErrorHandler errorHandler;
 
-    public EventRuleTransfer(CirculatorContext circulatorContext, 
OffsetManager offsetManager) {
+    public EventRuleTransfer(CirculatorContext circulatorContext, 
OffsetManager offsetManager,
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.offsetManager = offsetManager;
+        this.errorHandler = errorHandler;
     }
 
     @Override
@@ -61,9 +65,10 @@ public class EventRuleTransfer extends ServiceThread {
     }
 
     @PostConstruct
-    public void init(){
+    public void init() {
         super.start();
     }
+
     @Override
     public void run() {
         while (!stopped) {
@@ -96,6 +101,7 @@ public class EventRuleTransfer extends ServiceThread {
                 CompletableFuture<Void> transformFuture = 
CompletableFuture.supplyAsync(() -> transfer.doTransforms(eventRecord))
                     .exceptionally((exception) -> {
                         logger.error("transfer do transform event record 
failed,stackTrace-", exception);
+                        errorHandler.handle(eventRecord,exception);
                         return null;
                     })
                     .thenAccept(record -> {
@@ -105,7 +111,7 @@ public class EventRuleTransfer extends ServiceThread {
                             record.getExtensions().put(runnerNameKey, 
transfer.getConnectConfig(runnerNameKey));
                             record.getExtensions().put(taskClassKey, 
transfer.getConnectConfig(taskClassKey));
                             afterTransformConnect.add(record);
-                        }else{
+                        } else {
                             offsetManager.commit(eventRecord);
                         }
                     });
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index e420ee2..b3216da 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.error.ErrorHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,10 +41,13 @@ public class EventTargetPusher extends ServiceThread {
 
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
+    private final ErrorHandler errorHandler;
 
-    public EventTargetPusher(CirculatorContext circulatorContext, 
OffsetManager offsetManager) {
+    public EventTargetPusher(CirculatorContext circulatorContext, 
OffsetManager offsetManager,
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.offsetManager = offsetManager;
+        this.errorHandler = errorHandler;
     }
 
     @Override
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
index 39d08d7..0b33b9e 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -52,8 +52,9 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
      * @param connectRecord
      * @param delaySec
      */
-    public  void put(String eventBusName, ConnectRecord connectRecord, int 
delaySec){
+    public  boolean put(String eventBusName, ConnectRecord connectRecord, int 
delaySec){
         // convert the eventBusName to Topic ?
+        return true;
     }
 
     /**
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
deleted file mode 100644
index 8609cad..0000000
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
-
-import com.google.common.collect.Lists;
-import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class PushCallback {
-
-    @Autowired
-    EventSubscriber eventSubscriber;
-
-    public void completed(ConnectRecord connectRecord) {
-        eventSubscriber.commit(Lists.newArrayList(connectRecord));
-    }
-
-    public void completed(List<ConnectRecord> connectRecords) {
-        eventSubscriber.commit(connectRecords);
-    }
-
-}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
deleted file mode 100644
index 9223a5f..0000000
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
-
-import io.openmessaging.connector.api.data.ConnectRecord;
-import java.util.List;
-import lombok.Data;
-
-@Data
-public class PushRequest {
-
-    private String targetRunnerName;
-
-    private String targetClass;
-
-    private List<ConnectRecord> connectRecords;
-
-    private PushCallback pushCallback;
-}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
index 16bdbb9..9aac1d5 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
@@ -20,6 +20,7 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtimer.error;
 import com.google.common.base.Strings;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
@@ -37,16 +38,21 @@ public class ErrorHandler {
     @Autowired
     EventSubscriber eventSubscriber;
 
+    @Autowired
+    OffsetManager offsetManager;
+
     public void handle(ConnectRecord connectRecord, Throwable t) {
         String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
         TargetRunnerConfig targetRunnerConfig = 
TargetRunnerContext.getTargetRunnerConfig(eventRunnerName);
         String eventBusName = targetRunnerConfig.getEventBusName();
         PushRetryStrategyEnum pushRetryStrategyEnum = 
PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy());
+
         int retryTimes = parseRetryTimes(connectRecord);
         int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
         if (delaySec > 0) {
             eventSubscriber.put(eventBusName, connectRecord, delaySec);
         }
+        offsetManager.commit(connectRecord);
     }
 
     private int parseRetryTimes(ConnectRecord connectRecord) {
@@ -71,7 +77,6 @@ public class ErrorHandler {
      * @return
      */
     private int calcDelaySec(int retryTimes, PushRetryStrategyEnum 
pushRetryStrategyEnum) {
-
         switch (pushRetryStrategyEnum) {
             case BACKOFF_RETRY:
                 if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {

Reply via email to