This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/runoptions in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 33d8be513019c6a9c43ded4c7ce4b6e84cfc1d8e Author: 2011shenlin <[email protected]> AuthorDate: Sat May 13 08:13:30 2023 +0800 feat:support standard sink connector. --- .../impl/runtime/RuntimeTargetRunnerAPIImpl.java | 1 + .../adapter/runtimer/common/entity/RunOptions.java | 29 ---- .../runtimer/common/entity/TargetRunnerConfig.java | 14 +- .../adapter/runtimer/error/ErrorHandler.java | 2 +- .../AbstractTargetRunnerConfigObserver.java | 32 +++- .../service/TargetRunnerConfigOnDBObserver.java | 21 +-- .../service/TargetRunnerConfigOnFileObserver.java | 28 ---- .../eventbridge/EventBridgeFilterTransform.java | 2 - supports/connect-standard/README.md | 0 supports/connect-standard/pom.xml | 177 +++++++++++++++++++++ .../apache/rocketmq/connect/StandardSinkTask.java | 55 +++++++ 11 files changed, 276 insertions(+), 85 deletions(-) diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java index fed9b4b..ee66625 100644 --- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java +++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java @@ -69,6 +69,7 @@ public class RuntimeTargetRunnerAPIImpl implements TargetRunnerAPI { components.add(filterComponent); components.add(transformComponent); components.add(targetComponent); + targetRunnerConfig.setRunOptions(runOptions); return targetRunnerConfig; } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java deleted file mode 100644 index 4e44981..0000000 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity; - -import lombok.Data; - -@Data -public class RunOptions { - - private String errorsTolerance; - - private String retryStrategy; - -} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java index 20f3d5a..d97a147 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java @@ -22,6 +22,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import lombok.Data; +import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; +import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY; @@ -38,7 +42,7 @@ public class TargetRunnerConfig implements Serializable { */ private List<Map<String, String>> components; - private RunOptions runOptions = new RunOptions(); + private RunOptions runOptions = RunOptions.builder().errorsTolerance(ErrorToleranceEnum.ALL).retryStrategy(RetryStrategy.builder().pushRetryStrategy(PushRetryStrategyEnum.EXPONENTIAL_DECAY_RETRY).build()).build(); @Override public boolean equals(Object o) { @@ -58,10 +62,10 @@ public class TargetRunnerConfig implements Serializable { @Override public String toString() { return "TargetRunnerConfig{" + - "name='" + name + '\'' + - ", components=" + components + - ", runOptions=" + runOptions + - '}'; + "name='" + name + '\'' + + ", components=" + components + + ", runOptions=" + runOptions + + '}'; } private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) { diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java index 9aac1d5..8a59c35 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java @@ -45,7 +45,7 @@ public class ErrorHandler { String eventRunnerName = connectRecord.getExtension(RUNNER_NAME); TargetRunnerConfig targetRunnerConfig = TargetRunnerContext.getTargetRunnerConfig(eventRunnerName); String eventBusName = targetRunnerConfig.getEventBusName(); - PushRetryStrategyEnum pushRetryStrategyEnum = PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy()); + PushRetryStrategyEnum pushRetryStrategyEnum = targetRunnerConfig.getRunOptions().getRetryStrategy().getPushRetryStrategy(); int retryTimes = parseRetryTimes(connectRecord); int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum); diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java index 5cdbe62..7157708 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java @@ -17,13 +17,12 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.service; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -88,4 +87,33 @@ public abstract class AbstractTargetRunnerConfigObserver implements TargetRunner listener.onDeleteTargetRunner(targetRunnerConfig); } } + + protected void diff() { + Map<String, TargetRunnerConfig> lastMap = toMap(this.getTargetRunnerConfig()); + Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig()); + lastMap.entrySet().forEach(entry -> { + TargetRunnerConfig latest = latestMap.get(entry.getKey()); + if (latest == null) { + this.onDeleteTargetRunner(entry.getValue()); + } else if (!latest.equals(entry.getValue())) { + this.onUpdateTargetRunner(entry.getValue()); + } + }); + + latestMap.entrySet().forEach(entry -> { + TargetRunnerConfig latest = lastMap.get(entry.getKey()); + if (latest == null) { + this.onAddTargetRunner(entry.getValue()); + } + }); + } + + protected Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) { + if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) { + return Maps.newHashMapWithExpectedSize(0); + } + Map<String, TargetRunnerConfig> map = Maps.newHashMap(); + targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry)); + return map; + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java index 5e74f56..071ca48 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java @@ -17,11 +17,9 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.service; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -37,8 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY; - @Slf4j @Component public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver { @@ -66,26 +62,15 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb return targetRunnerConfigs; } - private Map<String, String> buildEventBusComponent(String eventBusName) { - Map<String, String> component = Maps.newHashMap(); - component.put(TARGET_RUNNER_KEY, eventBusName); - return component; - } - @PostConstruct public void addListen() { service.scheduleAtFixedRate(() -> { try { - Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig(); - Set<TargetRunnerConfig> last = super.getTargetRunnerConfig(); - TargetRunnerConfig changed = null; - super.onAddTargetRunner(changed); - super.onUpdateTargetRunner(changed); - super.onDeleteTargetRunner(changed); + super.diff(); } catch (Throwable e) { - log.error("Watch failed.", e); + log.error("Watch file failed.", e); } - }, 0, 30, TimeUnit.SECONDS); + }, 0, 3, TimeUnit.SECONDS); } } \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java index 791efc1..3948ff7 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java @@ -111,34 +111,6 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig }, 0, 3, TimeUnit.SECONDS); } - public void diff() { - Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig()); - Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig()); - lastMap.entrySet().forEach(entry -> { - TargetRunnerConfig latest = latestMap.get(entry.getKey()); - if (latest == null) { - super.onDeleteTargetRunner(entry.getValue()); - } else if (!latest.equals(entry.getValue())) { - super.onUpdateTargetRunner(entry.getValue()); - } - }); - - latestMap.entrySet().forEach(entry -> { - TargetRunnerConfig latest = lastMap.get(entry.getKey()); - if (latest == null) { - super.onAddTargetRunner(entry.getValue()); - } - }); - } - - private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) { - if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) { - return Maps.newHashMapWithExpectedSize(0); - } - Map<String, TargetRunnerConfig> map = Maps.newHashMap(); - targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry)); - return map; - } private String getConfigFilePath() { return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath(); diff --git a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java index 9f704b3..7105fa7 100644 --- a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java +++ b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java @@ -34,7 +34,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap @Override public ConnectRecord doTransform(ConnectRecord record) { - System.out.println("Start to filter transform:::::::::::::::::"); if (!evaluator.evaluateData(new Gson().toJson(record.getData()))) { return null; } else if (!evaluator.evaluateSpecAttr(this.buildSpecAttr(record))) { @@ -42,7 +41,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap } else if (!evaluator.evaluateExtensionAttr(this.buildExtensionAttr(record))) { return null; } else { - System.out.println("end to filter transform:::::::::::::::::"); return record; } } diff --git a/supports/connect-standard/README.md b/supports/connect-standard/README.md new file mode 100644 index 0000000..e69de29 diff --git a/supports/connect-standard/pom.xml b/supports/connect-standard/pom.xml new file mode 100644 index 0000000..90c8bb5 --- /dev/null +++ b/supports/connect-standard/pom.xml @@ -0,0 +1,177 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>connect-standard</artifactId> + <version>1.0.0-SNAPSHOT</version> + <modelVersion>4.0.0</modelVersion> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <openmessaging-connector.version>0.1.2</openmessaging-connector.version> + <gson.version>2.8.9</gson.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>${openmessaging-connector.version}</version> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>RELEASE</version> + <scope>test</scope> + </dependency> + + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform + </mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java new file mode 100644 index 0000000..9714be2 --- /dev/null +++ b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect; + +import com.google.gson.Gson; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import java.util.List; + +public class StandardSinkTask extends SinkTask { + + @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.isEmpty()) { + return; + } + sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord))); + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + @Override public void validate(KeyValue config) { + + } + + @Override public void init(KeyValue config) { + + } + + @Override public void stop() { + + } +} \ No newline at end of file
