This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new 2ca9952  Standard the run context of the config json file (#89)
2ca9952 is described below

commit 2ca9952ef9a239d407653229d9d7c878e9b8bd5b
Author: Artisan <[email protected]>
AuthorDate: Thu Apr 20 14:29:02 2023 +0800

    Standard the run context of the config json file (#89)
    
    Standard the run context of the config json file
---
 adapter/runtimer/pom.xml                           |  2 +-
 .../adapter/runtimer/boot/EventRuleTransfer.java   |  7 ++-
 .../runtimer/boot/listener/CirculatorContext.java  | 11 ++--
 .../boot/listener/RocketMQEventSubscriber.java     |  4 +-
 .../runtimer/boot/transfer/TransformEngine.java    | 59 ++++++++++++----------
 .../runtimer/common/entity/TargetRunnerConfig.java |  8 +--
 .../runtimer/config/RuntimerConfigDefine.java      | 21 ++------
 .../AbstractTargetRunnerConfigObserver.java        |  4 --
 .../service/TargetRunnerConfigObserver.java        |  2 -
 .../runtimer/src/main/resources/target-runner.json | 15 +++---
 10 files changed, 61 insertions(+), 72 deletions(-)

diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 20a8ee6..158f76c 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -64,7 +64,7 @@
         <dependency>
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
-            <version>0.9.11</version>
+            <version>0.9.10</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index cbb3585..635c4bd 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
@@ -71,10 +70,10 @@ public class EventRuleTransfer extends ServiceThread {
                 continue;
             }
             // the event channel take rocket mq topic name as default
-            String eventChannelKey = RuntimerConfigDefine.CONNECT_TOPICNAME;
-            String eventChannel = eventRecord.getExtension(eventChannelKey);
+            String eventChannelName = RuntimerConfigDefine.CHANNEL_NAME;
+            String eventChannel = eventRecord.getExtension(eventChannelName);
             Set<TransformEngine<ConnectRecord>> adaptTransformSet = 
latestTransformMap.values().stream()
-                    .filter(engine -> 
eventChannel.equals(engine.getConnectConfig(eventChannelKey)))
+                    .filter(engine -> 
eventChannel.equals(engine.getConnectConfig(eventChannelName)))
                     .collect(Collectors.toSet());
             if(CollectionUtils.isEmpty(adaptTransformSet)){
                     logger.warn("adapt specific topic ref transform engine is 
empty, eventChannelName- {}", eventChannel);
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
index de371f7..fd29d8f 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -34,6 +34,7 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDef
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
 
 import java.util.List;
 import java.util.Map;
@@ -161,15 +162,15 @@ public class CirculatorContext implements 
TargetRunnerListener {
         switch (refreshTypeEnum) {
             case ADD:
             case UPDATE:
-                TargetKeyValue targetKeyValue = new TargetKeyValue();
-                
targetRunnerConfig.getComponents().forEach(targetKeyValue::putAll);
-                TransformEngine<ConnectRecord> transformChain = new 
TransformEngine<>(targetKeyValue, plugin);
+                TransformEngine<ConnectRecord> transformChain = new 
TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
                 taskTransformMap.put(runnerName, transformChain);
 
+                int endIndex = targetRunnerConfig.getComponents().size() -1;
+                TargetKeyValue targetKeyValue = new 
TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
                 SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
                 pusherTaskMap.put(runnerName, sinkTask);
 
-                String pusherClass = 
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+                String pusherClass = 
targetKeyValue.getString(RuntimerConfigDefine.RUNNER_CLASS);
                 if (StringUtils.isNotEmpty(pusherClass) && 
!pusherExecutorMap.containsKey(pusherClass)) {
                     pusherExecutorMap.put(pusherClass, 
initDefaultThreadPoolExecutor(pusherClass));
                 }
@@ -200,7 +201,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
      * @return
      */
     private SinkTask initTargetSinkTask(TargetKeyValue targetKeyValue) {
-        String taskClass = 
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+        String taskClass = 
targetKeyValue.getString(RuntimerConfigDefine.RUNNER_CLASS);
         ClassLoader loader = plugin.getPluginClassLoader(taskClass);
         Class taskClazz;
         boolean isolationFlag = false;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 10d25ac..aa263c7 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -159,7 +159,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
                 logger.warn("target runner config components is empty, config 
info - {}", runnerConfig);
                 continue;
             }
-            
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CONNECT_TOPICNAME));
+            
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
         }
         return listenTopics;
     }
@@ -280,7 +280,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         String bodyStr = new String(body, StandardCharsets.UTF_8);
         sinkRecord = new ConnectRecord(recordPartition, recordOffset, 
timestamp, schema, bodyStr);
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, 
messageExt.getTopic());
+        keyValue.put(RuntimerConfigDefine.CHANNEL_NAME, messageExt.getTopic());
         if (MapUtils.isNotEmpty(properties)) {
             for (Map.Entry<String, String> entry : properties.entrySet()) {
                 keyValue.put(entry.getKey(), entry.getValue());
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index c29fc05..56f8549 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -17,16 +17,13 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Splitter;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.internal.DefaultKeyValue;
-import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -35,8 +32,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 public class TransformEngine<R extends ConnectRecord> implements AutoCloseable 
{
 
@@ -44,7 +41,7 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     private final List<Transform> transformList;
 
-    private Map<String,List<Transform>> transformListMap;
+    private List<Map<String, String>> transferConfigs;
 
     private final KeyValue config;
 
@@ -54,35 +51,25 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     private static final String PREFIX = RuntimerConfigDefine.TRANSFORMS + "-";
 
-    public TransformEngine(KeyValue config, Plugin plugin) {
-        this.config = config;
+    public TransformEngine(List<Map<String, String>> transferConfigs, Plugin 
plugin) {
+        this.transferConfigs = transferConfigs;
+        this.config = formatTargetKey(transferConfigs);
         this.plugin = plugin;
         transformList = new ArrayList<>(8);
         init();
     }
 
     private void init() {
-        String transformsStr = 
config.getString(RuntimerConfigDefine.TRANSFORMS);
-        if (StringUtils.isBlank(transformsStr)) {
-            log.warn("no transforms config, {}", JSON.toJSONString(config));
-            return;
-        }
-        List<String> transformList = 
Splitter.on(COMMA).omitEmptyStrings().trimResults().splitToList(transformsStr);
-        if (CollectionUtils.isEmpty(transformList)) {
-            log.warn("transforms config is null, {}", 
JSON.toJSONString(config));
-            return;
-        }
-        transformList.stream().forEach(transformStr -> {
-            String transformClassKey = PREFIX + transformStr + "-class";
-            String transformClass = config.getString(transformClassKey);
+        int endIndex = transferConfigs.size() - 1;
+        for (int index = 1; index < endIndex; index++) {
+            Map<String, String> transferMap = transferConfigs.get(index);
+            String transformClass = 
transferMap.get(RuntimerConfigDefine.RUNNER_CLASS);
             try {
                 Transform transform = getTransform(transformClass);
                 KeyValue transformConfig = new DefaultKeyValue();
-                Set<String> configKeys = config.keySet();
-                for (String key : configKeys) {
-                    if (key.startsWith(PREFIX + transformStr) && 
!key.equals(transformClassKey)) {
-                        String originKey = key.replace(PREFIX + transformStr + 
"-", "");
-                        transformConfig.put(originKey, config.getString(key));
+                for (String key : transferMap.keySet()) {
+                    if (!key.equals(RuntimerConfigDefine.RUNNER_CLASS)) {
+                        transformConfig.put(key, transferMap.get(key));
                     }
                 }
                 transform.validate(transformConfig);
@@ -91,7 +78,25 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
             } catch (Exception e) {
                 log.error("transform new instance error", e);
             }
-        });
+        }
+    }
+
+    /**
+     * format listener and pusher key
+     * @param components
+     * @return
+     */
+    private TargetKeyValue formatTargetKey(List<Map<String, String>> 
components) {
+        if(CollectionUtils.isEmpty(components)){
+            return null;
+        }
+        int startIndex = 0;
+        int endIndex = components.size() - 1;
+        // init listener key
+        TargetKeyValue targetKeyValue = new 
TargetKeyValue(components.get(startIndex));
+        // init pusher key
+        targetKeyValue.put(RuntimerConfigDefine.TASK_CLASS, 
components.get(endIndex).get(RuntimerConfigDefine.RUNNER_CLASS));
+        return targetKeyValue;
     }
 
     /**
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 63edbc8..20f3d5a 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -57,11 +57,11 @@ public class TargetRunnerConfig implements Serializable {
 
     @Override
     public String toString() {
-        //TODO
         return "TargetRunnerConfig{" +
-            "connectName='" + name + '\'' +
-            ", properties=" + components +
-            '}';
+                "name='" + name + '\'' +
+                ", components=" + components +
+                ", runOptions=" + runOptions +
+                '}';
     }
 
     private boolean isEqualsComponents(List<Map<String, String>> source, 
List<Map<String, String>> target) {
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 332f33d..578e92a 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -32,6 +32,10 @@ public class RuntimerConfigDefine {
      */
     public static final String CONNECTOR_CLASS = "connector-class";
 
+    public static final String RUNNER_CLASS = "class";
+
+    public static final String TRANSFER_CLASS = "transfer-class";
+
     public static final String CONNECTOR_DIRECT_ENABLE = 
"connector-direct-enable";
 
     public static final String TASK_CLASS = "task-class";
@@ -79,7 +83,7 @@ public class RuntimerConfigDefine {
 
     public static final String CONNECT_SHARDINGKEY = "connect-shardingkey";
 
-    public static final String CONNECT_TOPICNAME = "connect-topicname";
+    public static final String CHANNEL_NAME = "channel-name";
 
     public static final String CONNECT_RULE_NAME = "connect-rule-name";
 
@@ -101,19 +105,4 @@ public class RuntimerConfigDefine {
 
     public static final String TARGET_RUNNER_KEY = "eventBusName";
 
-    /**
-     * The required key for all configurations.
-     */
-    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
-        {
-            add(CONNECTOR_CLASS);
-            add(CONNECT_TOPICNAME);
-        }
-    };
-
-    /**
-     * Maximum allowed message size in bytes, the default vaule is 4M.
-     */
-    public static final int MAX_MESSAGE_SIZE = 
Integer.parseInt(System.getProperty("rocketmq.runtime.max.message.size", 
"4194304"));
-
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
index 619bc63..5cdbe62 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -88,8 +88,4 @@ public abstract class AbstractTargetRunnerConfigObserver 
implements TargetRunner
             listener.onDeleteTargetRunner(targetRunnerConfig);
         }
     }
-
-    public Map<String, List<TargetKeyValue>> getTaskConfigs() {
-        return null;
-    }
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index c6c7670..6c713e3 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -42,6 +42,4 @@ public interface TargetRunnerConfigObserver {
      */
     void registerListener(TargetRunnerListener listener);
 
-    @Deprecated
-    Map<String, List<TargetKeyValue>> getTaskConfigs();
 }
diff --git a/adapter/runtimer/src/main/resources/target-runner.json 
b/adapter/runtimer/src/main/resources/target-runner.json
index e94e506..6ffcaf7 100644
--- a/adapter/runtimer/src/main/resources/target-runner.json
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -4,17 +4,18 @@
     "components":[
       {
         "runner-name": "demo-runner",
-        "connect-topicname":"eventbridge%654321%demo-bus_1678348282165"
+        "channel-name":"eventbridge%654321%demo-bus_1678348282165"
       },
       {
-        "transforms":"filter,transform,",
-        "transforms-filter-filterPattern":"{}",
-        
"transforms-filter-class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform",
-        
"transforms-transform-data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
-        "transforms-transform-class": 
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+        "filterPattern":"{}",
+        
"class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform"
       },
       {
-        
"task-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
+        
"data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
+        "class": 
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+      },
+      {
+        "class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
         
"webHook":"https://oapi.dingtalk.com/robot/send?access_token=7f78aa4734ea9bd245984e47b6764ccb950b4292e4f6f9424dff92909f485f16";,
         
"secretKey":"SEC8a898c9df7b6415090a8f1341d9eed000c815a89f301f2de87302a1e802dbd69"
       }

Reply via email to