This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/demo in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 96426a4774ae2788b01cc320d483d6033f84edb3 Author: 2011shenlin <[email protected]> AuthorDate: Sat Jun 10 15:39:06 2023 +0800 feat:add runtime demo. --- README.md | 215 +++------------------ .../db/migration/V9__init_file_target_class.sql | 32 +++ .../runtime/config/RuntimeConfiguration.java | 8 +- .../service/TargetRunnerConfigOnDBObserver.java | 4 +- .../service/TargetRunnerConfigOnFileObserver.java | 2 +- .../rocketmq/runtimer/RocketMQEventSubscriber.java | 6 +- docs/CreateDingTalkTarget.md | 86 +++++++++ docs/EventSource.md | 42 ++++ start/src/main/resources/application.properties | 6 +- .../README.md | 0 .../pom.xml | 4 +- .../apache/rocketmq/connect/sink/FileConstant.java | 22 +++ .../rocketmq/connect/sink/FileSinkTask.java} | 39 +++- .../apache/rocketmq/connect/FileSinkTaskTest.java} | 46 ++--- .../README.md | 0 .../pom.xml | 4 +- .../rocketmq/connect/sink/StandardConstant.java | 22 +++ .../rocketmq/connect/sink}/StandardSinkTask.java | 8 +- 18 files changed, 304 insertions(+), 242 deletions(-) diff --git a/README.md b/README.md index 230ccff..505be7d 100644 --- a/README.md +++ b/README.md @@ -42,14 +42,14 @@ according to the manual: [RocketMQ Quick Start](https://rocketmq.apache.org/docs Apache RocketMQ Connect can connect the external upstream and downstream services,and You can deploy it according to the manual: [RocketMQ Connect Quick Start](https://github.com/apache/rocketmq-connect) -. Before deploy the Apache RocketMQ Connect, you should download the plugins below and put it to the "pluginPaths" which +. Before deploy the Apache RocketMQ Connect, you should download the plugins below and put it to the "pluginpath" which defined on rocketmq-connect. -* [rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar) -* [rocketmq-connect-dingtalk-1.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-dingtalk-1.0-SNAPSHOT-jar-with-dependencies.jar) -* [connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) -* [connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) -* [connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) +* [rocketmq-connect-eventbridge.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar) +* [eventbridge-connect-file.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/eventbridge-connect-file-1.0.0-SNAPSHOT-jar-with-dependencies.jar) +* [connect-cloudevent-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) +* [connect-filter-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) +* [connect-eventbridge-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar) #### Apache RocketMQ EventBridge @@ -66,202 +66,35 @@ rocketmq.namesrvAddr=xxxxx:9876 # The cluster name of rocketmq. rocketmq.cluster.name=DefaultCluster - -# The endpoint of rocketmq-connect. -rocketmq.connect.endpoint=xxxxxx:8082 +runtime.pluginpath=xxxx ``` +Config the runtime.pluginpath to set the directory of plugin. ## Demo #### -* Create EventBus - -```text -POST /bus/createEventBus HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ -"eventBusName":"demo-bus", -"description":"a demo bus." -} -``` - -* Create EventSource - -```text -POST /source/createEventSource HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ -"eventBusName":"demo-bus", -"eventSourceName":"demo-source", -"description":"A demo source." -} -``` - -* Create EventRule - -```text -POST /rule/createEventRule HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "description":"A demo rule.", - "filterPattern":"{}" -} -``` - -* Create Target - -This is a sample with EventBridge target: - -```text -POST /target/createEventTargets HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "eventTargets":[ - { - "eventTargetName":"eventbridge-target", - "className":"acs.eventbridge", - "config":{ - "RegionId":"cn-hangzhou", - "AliyunEventBus":"rocketmq-eventbridge" - } - } - ] -} -``` - -This is a sample with DingTalk target: - -```text -POST /target/createEventTargets HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "eventTargets":[ - { - "eventTargetName":"dingtalk-target", - "className":"acs.dingtalk", - "config":{ - "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867", - "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa", - "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}" - } - } - ] -} -``` - * Put Events to EventBus - -```text -POST /putEvents HTTP/1.1 -Host: demo.eventbridge.com -Content-Type:"application/cloudevents+json; charset=UTF-8" -{ - "specversion" : "1.0", - "type" : "com.github.pull_request.opened", - "source" : "https://github.com/cloudevents/spec/pull", - "subject" : "123", - "id" : "A234-1234-1234", - "time" : "2018-04-05T17:31:00Z", - "datacontenttype" : "application/json", - "data" : { - "body":"demo" - }, - "aliyuneventbusname":"demo-bus" -} -``` - -* Use HttpSource to put events - -EventBridge HttpSource allows you to put events to eventbus in the form of webhook. - -Here is an example explaining how to put events using EventBridge HttpSource. - -1. Create an EventBridge HttpSource - - - eventSourceName: Name of EventSource - - eventBusName: Name of EventBus - - description: Description - - className: HttpEvent. This parameter is a fixed value and cannot be modified. - - config: HttpSource Config - - Type: Request type. Available values are 'HTTP', 'HTTPS' and 'HTTP&HTTPS'. - - Method: Allowed HTTP request methods. The request will be filtered if the http request method type for accessing - the webhook does not meet the configuration. - - SecurityConfig: Security configuration type. Available values are 'none', 'ip' and 'referer'. - - Ip: IP security configuration. Http requests whose source ip is not in the configured network segment will be - filtered if the security configuration is selected as 'ip'. - - Referer: Referer security configuration. HTTP requests whose referer is not in this configuration will be filtered - if the security configuration is selected as 'referer'. - -A webhook will be generated after the creation of HttpSource. - + The system creates a demo bus by default, and you can send events directly to the bus. ```text -POST /source/createEventSource HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventSourceName": "httpEventSourceDemo", - "eventBusName": "demo", - "description": "http source demo", - "className": "HttpEvent", - "config": { - "Type": "HTTP&HTTPS", - "Method": ["GET", "POST"], - "SecurityConfig": "ip", - "Ip": ["10.0.0.0/8"], - "Referer":[] - } -} +curl -X POST http://127.0.0.1:7001/putEvents \ +-H "Content-Type: application/json; charset=UTF-8" \ +-H "ce-specversion:1.0" \ +-H "ce-type:com.github.pull_request.opened" \ +-H "ce-source:https://github.com/cloudevents/spec/pull" \ +-H "ce-subject:demo" \ +-H "ce-id:1234-1234-1234" \ +-H "ce-datacontenttype:application/json" \ +-H "ce-time:2018-04-05T17:31:00Z" \ +-H "ce-aliyuneventbusname:demo-bus" \ +-d 'test' ``` -2. Put event to EventBus +* Check if the local file received a write event -Http request to access this webhook will be converted into a CloudEvent and delivered to eventbus. +In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo.eventbridge -``` -curl -d '{"username": "testUser", "testData": "testData"}' -H 'Content-Type: application/json' -H 'Accept-Language: en-US' http://127.0.0.1:7001/webhook/putEvents?token=43146d108b224eb2adc581aedd28f272007320d14b9d -``` - -generated CloudEvent demo - -```json -{ - "datacontenttype": "application/json", - "data": { - "body": { - "username": "testUser", - "testData": "testData" - }, - "headers": { - "Accept": "*/*", - "User-Agent": "curl/7.64.1", - "Host": "127.0.0.1:7001", - "Accept-Language": "en-US", - "Content-Length": "48", - "Content-Type": "application/json" - }, - "httpMethod": "POST", - "path": "/webhook/putEvents", - "queryString": {} - }, - "subject": "DemoBus/httpEventSourceDemo", - "source": "httpEventSourceDemo", - "type": "eventbridge:Events:HTTPEvent", - "specversion": "1.0", - "id": "75bc099b-130a-45a8-82e1-3f9a7f0d10f3", - "time": "2022-05-12T17:20:30.264+08:00" -} -``` + +>> 链接:为什么输出:test,... diff --git a/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql b/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql new file mode 100644 index 0000000..5029f79 --- /dev/null +++ b/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO `event_target_class` (`id`,`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES (4,'file','{ + "fileName":{ + "type":"String", + "desc":"the output file name.", + "required":false + }, + "line":{ + "type":"String", + "desc":"the content write to file." + } +} +','{ "data":"${line}" }','{ + "fileName":"${fileName}", + "class":"org.apache.rocketmq.connect.sink.FileSinkTask" +}',null,'output file config','2023-06-09 16:54:55','2023-06-09 16:54:57'); \ No newline at end of file diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java index b6a7a46..56fa123 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java @@ -21,18 +21,22 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.enums.ConfigModeEn import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigObserver; import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigOnDBObserver; import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigOnFileObserver; +import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRepository; +import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRunnerRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; @Configuration public class RuntimeConfiguration { @Bean(name = "runnerConfigObserver") - public TargetRunnerConfigObserver targetRunnerConfigObserver(@Value("${runtime.config.mode}") String configMode) { + public TargetRunnerConfigObserver targetRunnerConfigObserver(@Value("${runtime.config.mode}") String configMode, + EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) { switch (ConfigModeEnum.parse(configMode)) { case DB: - return new TargetRunnerConfigOnDBObserver(); + return new TargetRunnerConfigOnDBObserver(eventTargetRunnerRepository, eventTargetRepository); default: return new TargetRunnerConfigOnFileObserver(); } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java index c272bd3..f018ce1 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java @@ -48,7 +48,9 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb @Autowired EventTargetRepository eventTargetRepository; - public TargetRunnerConfigOnDBObserver() { + public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) { + this.eventTargetRunnerRepository = eventTargetRunnerRepository; + this.eventTargetRepository = eventTargetRepository; } @Override diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java index b5d34f3..cbd01e0 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java @@ -41,7 +41,7 @@ import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.springframework.stereotype.Component; @Slf4j -@Component +//@Component public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver { private String pathName; diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index 3005486..56a4550 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -217,7 +217,11 @@ public class RocketMQEventSubscriber extends EventSubscriber { * init rocket mq pull consumer */ private void initConsumeWorkers() { - for (SubscribeRunnerKeys subscribeRunnerKeys : runnerConfigObserver.getSubscribeRunnerKeys()) { + Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys(); + if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){ + return; + } + for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) { LitePullConsumer litePullConsumer = initLitePullConsumer(subscribeRunnerKeys); ConsumeWorker consumeWorker = new ConsumeWorker(litePullConsumer, subscribeRunnerKeys.getRunnerName()); consumeWorkerMap.put(subscribeRunnerKeys.getRunnerName(), consumeWorker); diff --git a/docs/CreateDingTalkTarget.md b/docs/CreateDingTalkTarget.md new file mode 100644 index 0000000..4446433 --- /dev/null +++ b/docs/CreateDingTalkTarget.md @@ -0,0 +1,86 @@ + +## Create EventBus + +```text +POST /bus/createEventBus HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ +"eventBusName":"demo-bus", +"description":"a demo bus." +} +``` + +## Create EventSource + +```text +POST /source/createEventSource HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ +"eventBusName":"demo-bus", +"eventSourceName":"demo-source", +"description":"A demo source." +} +``` + +## Create EventRule + +```text +POST /rule/createEventRule HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventBusName":"demo-bus", + "eventRuleName":"demo-rule", + "description":"A demo rule.", + "filterPattern":"{}" +} +``` + +## Create Target + +This is a sample with EventBridge target: + +```text +POST /target/createEventTargets HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventBusName":"demo-bus", + "eventRuleName":"demo-rule", + "eventTargets":[ + { + "eventTargetName":"eventbridge-target", + "className":"acs.eventbridge", + "config":{ + "RegionId":"cn-hangzhou", + "AliyunEventBus":"rocketmq-eventbridge" + } + } + ] +} +``` + +This is a sample with DingTalk target: + +```text +POST /target/createEventTargets HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventBusName":"demo-bus", + "eventRuleName":"demo-rule", + "eventTargets":[ + { + "eventTargetName":"dingtalk-target", + "className":"acs.dingtalk", + "config":{ + "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867", + "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa", + "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}" + } + } + ] +} +``` diff --git a/docs/EventSource.md b/docs/EventSource.md new file mode 100644 index 0000000..13dd41b --- /dev/null +++ b/docs/EventSource.md @@ -0,0 +1,42 @@ +## Use HttpSource to put events + +EventBridge HttpSource allows you to put events to eventbus in the form of webhook. + +Here is an example explaining how to put events using EventBridge HttpSource. + +1. Create an EventBridge HttpSource + + - eventSourceName: Name of EventSource + - eventBusName: Name of EventBus + - description: Description + - className: HttpEvent. This parameter is a fixed value and cannot be modified. + - config: HttpSource Config + - Type: Request type. Available values are 'HTTP', 'HTTPS' and 'HTTP&HTTPS'. + - Method: Allowed HTTP request methods. The request will be filtered if the http request method type for accessing + the webhook does not meet the configuration. + - SecurityConfig: Security configuration type. Available values are 'none', 'ip' and 'referer'. + - Ip: IP security configuration. Http requests whose source ip is not in the configured network segment will be + filtered if the security configuration is selected as 'ip'. + - Referer: Referer security configuration. HTTP requests whose referer is not in this configuration will be filtered + if the security configuration is selected as 'referer'. + +A webhook will be generated after the creation of HttpSource. + +```text +POST /source/createEventSource HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventSourceName": "httpEventSourceDemo", + "eventBusName": "demo", + "description": "http source demo", + "className": "HttpEvent", + "config": { + "Type": "HTTP&HTTPS", + "Method": ["GET", "POST"], + "SecurityConfig": "ip", + "Ip": ["10.0.0.0/8"], + "Referer":[] + } +} +``` diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 3736814..dc13acc 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -27,14 +27,14 @@ mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl spring.flyway.placeholderReplacement=false ## rocketmq rocketmq.namesrvAddr=localhost:9876 -rocketmq.connect.endpoint=http://127.0.0.1:8082 + rocketmq.cluster.name=DefaultCluster ## runtime -runtime.config.mode=FILE +runtime.config.mode=DB runtime.storage.mode=ROCKETMQ rumtime.name=eventbridge-runtimer runtime.pluginpath=/Users/Local/eventbridge/plugin -runtime.storePathRootDir=/Users/Local/eventbridge/store + ## log app.name=rocketmqeventbridge diff --git a/supports/connect-standard/README.md b/supports/eventbridge-connect-file/README.md similarity index 100% copy from supports/connect-standard/README.md copy to supports/eventbridge-connect-file/README.md diff --git a/supports/connect-standard/pom.xml b/supports/eventbridge-connect-file/pom.xml similarity index 98% copy from supports/connect-standard/pom.xml copy to supports/eventbridge-connect-file/pom.xml index 90c8bb5..a42cdfa 100644 --- a/supports/connect-standard/pom.xml +++ b/supports/eventbridge-connect-file/pom.xml @@ -14,7 +14,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>org.apache.rocketmq</groupId> - <artifactId>connect-standard</artifactId> + <artifactId>eventbridge-connect-file</artifactId> <version>1.0.0-SNAPSHOT</version> <modelVersion>4.0.0</modelVersion> @@ -154,7 +154,7 @@ <configuration> <archive> <manifest> - <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform + <mainClass>org.apache.rocketmq.connect.sink.FileSinkTask </mainClass> </manifest> </archive> diff --git a/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java new file mode 100644 index 0000000..a0843d1 --- /dev/null +++ b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.sink; + +public class FileConstant { + public static final String FILE_NAME="fileName"; +} \ No newline at end of file diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java similarity index 53% copy from supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java copy to supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java index 9714be2..39526cd 100644 --- a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java +++ b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java @@ -15,22 +15,36 @@ * limitations under the License. */ -package org.apache.rocketmq.connect; +package org.apache.rocketmq.connect.sink; -import com.google.gson.Gson; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.errors.ConnectException; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.List; -public class StandardSinkTask extends SinkTask { +public class FileSinkTask extends SinkTask { + private String fileName = System.getProperty("user.home") + "/demo.eventbridge"; + private PrintStream outputStream; @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException { if (sinkRecords == null || sinkRecords.isEmpty()) { return; } - sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord))); + for (ConnectRecord connectRecord : sinkRecords) { + try { + outputStream.println(connectRecord.getData()); + } catch (Throwable e) { + throw new ConnectException("Write record to file failed.", e); + } + } + } @Override public void pause() { @@ -46,10 +60,25 @@ public class StandardSinkTask extends SinkTask { } @Override public void init(KeyValue config) { + String inputFileName = config.getString(FileConstant.FILE_NAME); + if (inputFileName != null) { + fileName = inputFileName; + } + try { + + outputStream = new PrintStream( + Files.newOutputStream(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND), + false, + StandardCharsets.UTF_8.name()); + } catch (IOException e) { + throw new ConnectException("Create outputStream: " + fileName + " for FileSinkTask failed", e); + } } @Override public void stop() { - + if (outputStream != null) { + outputStream.close(); + } } } \ No newline at end of file diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java similarity index 56% copy from supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java copy to supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java index 9714be2..752bad4 100644 --- a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java +++ b/supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java @@ -17,39 +17,23 @@ package org.apache.rocketmq.connect; -import com.google.gson.Gson; import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; -import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; import java.util.List; - -public class StandardSinkTask extends SinkTask { - - @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException { - if (sinkRecords == null || sinkRecords.isEmpty()) { - return; - } - sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord))); - } - - @Override public void pause() { - - } - - @Override public void resume() { - - } - - @Override public void validate(KeyValue config) { - - } - - @Override public void init(KeyValue config) { - - } - - @Override public void stop() { - +import org.apache.rocketmq.connect.sink.FileSinkTask; + +public class FileSinkTaskTest { + public static void main(String[] args) { + FileSinkTask fileSinkTask = new FileSinkTask(); + KeyValue config = new DefaultKeyValue(); + fileSinkTask.init(config); + List<ConnectRecord> sinkRecords = new ArrayList<>(); + ConnectRecord connectRecord= new ConnectRecord(null,null,null); + connectRecord.setData("test"); + sinkRecords.add(connectRecord); + fileSinkTask.put(sinkRecords); + fileSinkTask.stop(); } } \ No newline at end of file diff --git a/supports/connect-standard/README.md b/supports/eventbridge-connect-standard/README.md similarity index 100% rename from supports/connect-standard/README.md rename to supports/eventbridge-connect-standard/README.md diff --git a/supports/connect-standard/pom.xml b/supports/eventbridge-connect-standard/pom.xml similarity index 98% rename from supports/connect-standard/pom.xml rename to supports/eventbridge-connect-standard/pom.xml index 90c8bb5..4b6e61b 100644 --- a/supports/connect-standard/pom.xml +++ b/supports/eventbridge-connect-standard/pom.xml @@ -14,7 +14,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>org.apache.rocketmq</groupId> - <artifactId>connect-standard</artifactId> + <artifactId>eventbridge-connect-standard</artifactId> <version>1.0.0-SNAPSHOT</version> <modelVersion>4.0.0</modelVersion> @@ -154,7 +154,7 @@ <configuration> <archive> <manifest> - <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform + <mainClass>org.apache.rocketmq.connect.sink.FileSinkTask </mainClass> </manifest> </archive> diff --git a/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java new file mode 100644 index 0000000..c399266 --- /dev/null +++ b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.sink; + +public class StandardConstant { + public static final String STANDARD_PREFIX="prefix"; +} \ No newline at end of file diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java similarity index 85% rename from supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java rename to supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java index 9714be2..ee02a1b 100644 --- a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java +++ b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.connect; +package org.apache.rocketmq.connect.sink; import com.google.gson.Gson; import io.openmessaging.KeyValue; @@ -26,11 +26,13 @@ import java.util.List; public class StandardSinkTask extends SinkTask { + private String prefix; + @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException { if (sinkRecords == null || sinkRecords.isEmpty()) { return; } - sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord))); + sinkRecords.forEach(sinkRecord -> System.out.println(prefix + ":" + new Gson().toJson(sinkRecord.getData()))); } @Override public void pause() { @@ -46,7 +48,7 @@ public class StandardSinkTask extends SinkTask { } @Override public void init(KeyValue config) { - + prefix = config.getString(StandardConstant.STANDARD_PREFIX); } @Override public void stop() {
