This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 527c7c7b5f [Feature][Connector-V2] add Aliyun SLS connector #3733 (#7348) 527c7c7b5f is described below commit 527c7c7b5f35e75cfb2eb180b57fe7ee218dfb4c Author: GumKey <743344...@qq.com> AuthorDate: Tue Aug 13 22:16:59 2024 +0800 [Feature][Connector-V2] add Aliyun SLS connector #3733 (#7348) --- .github/workflows/labeler/label-scope-conf.yml | 5 + config/plugin_config | 1 + docs/en/connector-v2/source/Sls.md | 87 ++++++ docs/zh/connector-v2/source/Sls.md | 87 ++++++ plugin-mapping.properties | 3 +- seatunnel-connectors-v2/connector-sls/pom.xml | 53 ++++ .../connectors/seatunnel/sls/config/Config.java | 82 ++++++ .../connectors/seatunnel/sls/config/StartMode.java | 41 +++ .../sls/serialization/FastLogDeserialization.java | 32 ++ .../FastLogDeserializationContent.java | 102 +++++++ .../FastLogDeserializationSchema.java | 134 +++++++++ .../seatunnel/sls/source/ConsumerMetaData.java | 40 +++ .../seatunnel/sls/source/SlsConsumerThread.java | 72 +++++ .../connectors/seatunnel/sls/source/SlsSource.java | 89 ++++++ .../seatunnel/sls/source/SlsSourceConfig.java | 135 +++++++++ .../seatunnel/sls/source/SlsSourceFactory.java | 68 +++++ .../seatunnel/sls/source/SlsSourceReader.java | 232 +++++++++++++++ .../seatunnel/sls/source/SlsSourceSplit.java | 68 +++++ .../sls/source/SlsSourceSplitEnumerator.java | 327 +++++++++++++++++++++ .../seatunnel/sls/state/SlsSourceState.java | 39 +++ .../connectors/seatunnel/sls/SlsFactoryTest.java | 31 ++ seatunnel-connectors-v2/pom.xml | 1 + seatunnel-dist/pom.xml | 7 + .../connector-sls-e2e/pom.xml | 36 +++ .../apache/seatunnel/e2e/connector/sls/SlsIT.java | 57 ++++ .../sls_source_with_schema_to_console.conf | 47 +++ .../sls_source_without_schema_to_console.conf | 39 +++ seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 28 files changed, 1915 insertions(+), 1 deletion(-) diff --git a/.github/workflows/labeler/label-scope-conf.yml b/.github/workflows/labeler/label-scope-conf.yml index 599ed64939..b0a89dfd35 100644 --- a/.github/workflows/labeler/label-scope-conf.yml +++ b/.github/workflows/labeler/label-scope-conf.yml @@ -275,3 +275,8 @@ format: dependencies: - changed-files: - any-glob-to-any-file: tools/dependencies/** +sls: + - all: + - changed-files: + - any-glob-to-any-file: seatunnel-connectors-v2/connector-sls/** + - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(sls)/**' \ No newline at end of file diff --git a/config/plugin_config b/config/plugin_config index f6549168d6..e3ac0f1d04 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -87,4 +87,5 @@ connector-tdengine connector-web3j connector-milvus connector-activemq +connector-sls --end-- \ No newline at end of file diff --git a/docs/en/connector-v2/source/Sls.md b/docs/en/connector-v2/source/Sls.md new file mode 100644 index 0000000000..6468f397ab --- /dev/null +++ b/docs/en/connector-v2/source/Sls.md @@ -0,0 +1,87 @@ +# Sls + +> Sls source connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> Seatunnel Zeta<br/> + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Source connector for Aliyun Sls. + +## Supported DataSource Info + +In order to use the Sls connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. + +| Datasource | Supported Versions | Maven | +|------------|--------------------|-----------------------------------------------------------------------------------------------------------| +| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-sls) | + +## Source Options + +| Name | Type | Required | Default | Description | +|-------------------------------------|---------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| +| project | String | Yes | - | [Aliyun Sls Project](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) | +| logstore | String | Yes | - | [Aliyun Sls Logstore](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) | +| endpoint | String | Yes | - | [Aliyun Access Endpoint](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) | +| access_key_id | String | Yes | - | [Aliyun AccessKey ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| access_key_secret | String | Yes | - | [Aliyun AccessKey Secret](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| start_mode | StartMode[earliest],[group_cursor],[latest] | No | group_cursor | The initial consumption pattern of consumers. | +| consumer_group | String | No | SeaTunnel-Consumer-Group | Sls consumer group id, used to distinguish different consumer groups. | +| auto_cursor_reset | CursorMode[begin],[end] | No | end | When there is no cursor in the consumer group, cursor initialization occurs | +| batch_size | Int | No | 1000 | The amount of data pulled from SLS each time | +| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | + +## Task Example + +### Simple + +> This example reads the data of sls's logstore1 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../s [...] + +[Create RAM user and authorization](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4),Please ensure thr ram user have sufficient rights to perform, reference [RAM Custom Authorization Example](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b) + +```hocon +# Defining the runtime environment +env { + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 30000 +} + +source { + Sls { + endpoint = "cn-hangzhou-intranet.log.aliyuncs.com" + project = "project1" + logstore = "logstore1" + access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx" + access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + +sink { + Console { + } +} +``` + diff --git a/docs/zh/connector-v2/source/Sls.md b/docs/zh/connector-v2/source/Sls.md new file mode 100644 index 0000000000..d0e1025725 --- /dev/null +++ b/docs/zh/connector-v2/source/Sls.md @@ -0,0 +1,87 @@ +# Sls + +> Sls source connector + +## 支持的引擎 + +> Spark<br/> +> Flink<br/> +> Seatunnel Zeta<br/> + +## 主要特性 + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## 描述 + +从阿里云Sls日志服务中读取数据。 + +## 支持的数据源信息 + +为了使用Sls连接器,需要以下依赖关系。 +它们可以通过install-plugin.sh或Maven中央存储库下载。 + +| 数据源 | 支持的版本 | Maven | +|-----|-----------|-----------------------------------------------------------------------------------------------------------| +| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-sls) | + +## Source Options + +| Name | Type | Required | Default | Description | +|-------------------------------------|---------------------------------------------|----------|--------------------------|------------------------------------------------------------------------------------------------------------------------------------| +| project | String | Yes | - | [阿里云 Sls 项目](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) | +| logstore | String | Yes | - | [阿里云 Sls 日志库](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) | +| endpoint | String | Yes | - | [阿里云访问服务点](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) | +| access_key_id | String | Yes | - | [阿里云访问用户ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| access_key_secret | String | Yes | - | [阿里云访问用户密码](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| start_mode | StartMode[earliest],[group_cursor],[latest] | No | group_cursor | 消费者的初始消费模式 | +| consumer_group | String | No | SeaTunnel-Consumer-Group | Sls消费者组id,用于区分不同的消费者组 | +| auto_cursor_reset | CursorMode[begin],[end] | No | end | 当消费者组中没有记录读取游标时,初始化读取游标 | +| batch_size | Int | No | 1000 | 每次从SLS中读取的数据量 | +| partition-discovery.interval-millis | Long | No | -1 | 动态发现主题和分区的间隔 | + +## 任务示例 + +### 简单示例 + +> 此示例读取sls的logstore1的数据并将其打印到客户端。如果您尚未安装和部署SeaTunnel,则需要按照安装SeaTunnel中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start SeaTunnel Engine.md)中的说明运行此作业。 + +[创建RAM用户及授权](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4), 请确认RAM用户有足够的权限来读取及管理数据,参考:[RAM自定义授权示例](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b) + +```hocon +# Defining the runtime environment +env { + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 30000 +} + +source { + Sls { + endpoint = "cn-hangzhou-intranet.log.aliyuncs.com" + project = "project1" + logstore = "logstore1" + access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx" + access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + +sink { + Console { + } +} +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 579bf2dac0..a74b9e1223 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,6 +130,7 @@ seatunnel.sink.ObsFile = connector-file-obs seatunnel.source.Milvus = connector-milvus seatunnel.sink.Milvus = connector-milvus seatunnel.sink.ActiveMQ = connector-activemq +seatunnel.source.Sls = connector-sls seatunnel.transform.Sql = seatunnel-transforms-v2 seatunnel.transform.FieldMapper = seatunnel-transforms-v2 @@ -140,4 +141,4 @@ seatunnel.transform.Replace = seatunnel-transforms-v2 seatunnel.transform.Split = seatunnel-transforms-v2 seatunnel.transform.Copy = seatunnel-transforms-v2 seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 -seatunnel.transform.LLM = seatunnel-transforms-v2 \ No newline at end of file +seatunnel.transform.LLM = seatunnel-transforms-v2 diff --git a/seatunnel-connectors-v2/connector-sls/pom.xml b/seatunnel-connectors-v2/connector-sls/pom.xml new file mode 100644 index 0000000000..dd47dd0864 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-sls</artifactId> + <name>SeaTunnel : Connectors V2 : Sls</name> + + <properties> + <aliyun-log.version>0.6.109</aliyun-log.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.aliyun.openservices</groupId> + <artifactId>aliyun-log</artifactId> + <version>${aliyun-log.version}</version> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-format-text</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java new file mode 100644 index 0000000000..46917b8b84 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import com.aliyun.openservices.log.common.Consts; + +public class Config { + public static final String CONNECTOR_IDENTITY = "Sls"; + + public static final Option<String> ENDPOINT = + Options.key("endpoint") + .stringType() + .noDefaultValue() + .withDescription("Aliyun Access endpoint"); + public static final Option<String> PROJECT = + Options.key("project") + .stringType() + .noDefaultValue() + .withDescription("Aliyun sls project"); + public static final Option<String> LOGSTORE = + Options.key("logstore") + .stringType() + .noDefaultValue() + .withDescription("Aliyun sls logstore"); + public static final Option<String> ACCESS_KEY_ID = + Options.key("access_key_id") + .stringType() + .noDefaultValue() + .withDescription("Aliyun accessKey id"); + public static final Option<String> ACCESS_KEY_SECRET = + Options.key("access_key_secret") + .stringType() + .noDefaultValue() + .withDescription("Aliyun accessKey secret"); + public static final Option<String> CONSUMER_GROUP = + Options.key("consumer_group") + .stringType() + .defaultValue("SeaTunnel-Consumer-Group") + .withDescription("Aliyun sls consumer group"); + public static final Option<Integer> BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(1000) + .withDescription("The amount of data pulled from sls each time"); + + public static final Option<StartMode> START_MODE = + Options.key("start_mode") + .objectType(StartMode.class) + .defaultValue(StartMode.GROUP_CURSOR) + .withDescription("initial consumption pattern of consumers"); + + public static final Option<Consts.CursorMode> AUTO_CURSOR_RESET = + Options.key("auto_cursor_reset") + .objectType(Consts.CursorMode.class) + .defaultValue(Consts.CursorMode.END) + .withDescription("init consumer cursor"); + + public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = + Options.key("partition-discovery.interval-millis") + .longType() + .defaultValue(-1L) + .withDescription( + "The interval for dynamically discovering topics and partitions."); +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/StartMode.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/StartMode.java new file mode 100644 index 0000000000..442d3970e5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/StartMode.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.config; + +public enum StartMode { + EARLIEST("earliest"), + + GROUP_CURSOR("group_cursor"), + + LATEST("latest"); + + private String mode; + + StartMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + @Override + public String toString() { + return mode; + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserialization.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserialization.java new file mode 100644 index 0000000000..1ae2ce3f18 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserialization.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.serialization; + +import org.apache.seatunnel.api.source.Collector; + +import com.aliyun.openservices.log.common.LogGroupData; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +public interface FastLogDeserialization<T> extends Serializable { + + default void deserialize(List<LogGroupData> logGroupDatas, Collector<T> out) + throws IOException {} +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java new file mode 100644 index 0000000000..27bd35bff2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationContent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.serialization; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.aliyun.openservices.log.common.FastLog; +import com.aliyun.openservices.log.common.FastLogGroup; +import com.aliyun.openservices.log.common.LogGroupData; + +import java.io.IOException; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.List; + +public class FastLogDeserializationContent + implements DeserializationSchema<SeaTunnelRow>, FastLogDeserialization<SeaTunnelRow> { + + public static final DateTimeFormatter TIME_FORMAT; + private final CatalogTable catalogTable; + + static { + TIME_FORMAT = + (new DateTimeFormatterBuilder()) + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .toFormatter(); + } + + public FastLogDeserializationContent(CatalogTable catalogTable) { + this.catalogTable = catalogTable; + } + + @Override + public SeaTunnelRow deserialize(byte[] bytes) throws IOException { + return null; + } + + @Override + public SeaTunnelDataType<SeaTunnelRow> getProducedType() { + return null; + } + + public void deserialize(List<LogGroupData> logGroupDatas, Collector<SeaTunnelRow> out) + throws IOException { + for (LogGroupData logGroupData : logGroupDatas) { + FastLogGroup logs = logGroupData.GetFastLogGroup(); + for (FastLog log : logs.getLogs()) { + SeaTunnelRow seaTunnelRow = convertFastLogContent(log); + out.collect(seaTunnelRow); + } + } + } + + private SeaTunnelRow convertFastLogContent(FastLog log) { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); + List<Object> transformedRow = new ArrayList<>(rowType.getTotalFields()); + // json format + StringBuilder jsonStringBuilder = new StringBuilder(); + jsonStringBuilder.append("{"); + log.getContents() + .forEach( + (content) -> + jsonStringBuilder + .append("\"") + .append(content.getKey()) + .append("\":\"") + .append(content.getValue()) + .append("\",")); + jsonStringBuilder.deleteCharAt(jsonStringBuilder.length() - 1); // 删除最后一个逗号 + jsonStringBuilder.append("}"); + // content field + transformedRow.add(jsonStringBuilder.toString()); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(transformedRow.toArray()); + seaTunnelRow.setRowKind(RowKind.INSERT); + seaTunnelRow.setTableId(catalogTable.getTableId().getTableName()); + return seaTunnelRow; + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationSchema.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationSchema.java new file mode 100644 index 0000000000..b00f81eb83 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/FastLogDeserializationSchema.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.serialization; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException; + +import com.aliyun.openservices.log.common.FastLog; +import com.aliyun.openservices.log.common.FastLogContent; +import com.aliyun.openservices.log.common.FastLogGroup; +import com.aliyun.openservices.log.common.LogGroupData; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.List; + +public class FastLogDeserializationSchema + implements DeserializationSchema<SeaTunnelRow>, FastLogDeserialization<SeaTunnelRow> { + + public static final DateTimeFormatter TIME_FORMAT; + private final CatalogTable catalogTable; + + static { + TIME_FORMAT = + (new DateTimeFormatterBuilder()) + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .toFormatter(); + } + + public FastLogDeserializationSchema(CatalogTable catalogTable) { + + this.catalogTable = catalogTable; + } + + @Override + public SeaTunnelRow deserialize(byte[] bytes) throws IOException { + return null; + } + + @Override + public SeaTunnelDataType<SeaTunnelRow> getProducedType() { + return null; + } + + public void deserialize(List<LogGroupData> logGroupDatas, Collector<SeaTunnelRow> out) + throws IOException { + for (LogGroupData logGroupData : logGroupDatas) { + FastLogGroup logs = logGroupData.GetFastLogGroup(); + for (FastLog log : logs.getLogs()) { + SeaTunnelRow seaTunnelRow = convertFastLogSchema(log); + out.collect(seaTunnelRow); + } + } + } + + private SeaTunnelRow convertFastLogSchema(FastLog log) { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); + List<Object> transformedRow = new ArrayList<>(rowType.getTotalFields()); + List<FastLogContent> logContents = log.getContents(); + for (FastLogContent flc : logContents) { + int keyIndex = rowType.indexOf(flc.getKey(), false); + if (keyIndex > -1) { + Object field = convert(rowType.getFieldType(keyIndex), flc.getValue()); + transformedRow.add(keyIndex, field); + } + } + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(transformedRow.toArray()); + seaTunnelRow.setRowKind(RowKind.INSERT); + seaTunnelRow.setTableId(catalogTable.getTableId().getTableName()); + return seaTunnelRow; + } + + private Object convert(SeaTunnelDataType<?> fieldType, String field) + throws SeaTunnelTextFormatException { + switch (fieldType.getSqlType()) { + case STRING: + return field; + case BOOLEAN: + return Boolean.parseBoolean(field); + case TINYINT: + return Byte.parseByte(field); + case SMALLINT: + return Short.parseShort(field); + case INT: + return Integer.parseInt(field); + case BIGINT: + return Long.parseLong(field); + case FLOAT: + return Float.parseFloat(field); + case DOUBLE: + return Double.parseDouble(field); + case DECIMAL: + return new BigDecimal(field); + case NULL: + return null; + case BYTES: + return field.getBytes(StandardCharsets.UTF_8); + default: + throw new SeaTunnelTextFormatException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel not support this data type [%s]", + fieldType.getSqlType())); + } + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/ConsumerMetaData.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/ConsumerMetaData.java new file mode 100644 index 0000000000..356075aaea --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/ConsumerMetaData.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.sls.config.StartMode; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserialization; + +import com.aliyun.openservices.log.common.Consts; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class ConsumerMetaData implements Serializable { + private String project; + private String logstore; + private String consumerGroup; + private StartMode startMode; + private Consts.CursorMode autoCursorReset; + private int fetchSize; + private FastLogDeserialization<SeaTunnelRow> deserializationSchema; + private CatalogTable catalogTable; +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java new file mode 100644 index 0000000000..7a2b9f65ba --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import com.aliyun.openservices.log.Client; +import lombok.Getter; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class SlsConsumerThread implements Runnable { + + private final Client client; + + @Getter private final LinkedBlockingQueue<Consumer<Client>> tasks; + + public SlsConsumerThread(SlsSourceConfig slsSourceConfig) { + this.client = this.initClient(slsSourceConfig); + this.tasks = new LinkedBlockingQueue<>(); + } + + public LinkedBlockingQueue<Consumer<Client>> getTasks() { + return tasks; + } + + @Override + public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + try { + Consumer<Client> task = tasks.poll(1, TimeUnit.SECONDS); + if (task != null) { + task.accept(client); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } finally { + try { + if (client != null) { + /** now do nothine, do not need close */ + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + } + + private Client initClient(SlsSourceConfig slsSourceConfig) { + return new Client( + slsSourceConfig.getEndpoint(), + slsSourceConfig.getAccessKeyId(), + slsSourceConfig.getAccessKeySecret()); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSource.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSource.java new file mode 100644 index 0000000000..fdd5af871d --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSource.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSourceState; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class SlsSource + implements SeaTunnelSource<SeaTunnelRow, SlsSourceSplit, SlsSourceState>, + SupportParallelism { + + private JobContext jobContext; + + private final SlsSourceConfig slsSourceConfig; + + public SlsSource(ReadonlyConfig readonlyConfig) { + this.slsSourceConfig = new SlsSourceConfig(readonlyConfig); + } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } + + @Override + public Boundedness getBoundedness() { + return JobMode.BATCH.equals(jobContext.getJobMode()) + ? Boundedness.BOUNDED + : Boundedness.UNBOUNDED; + } + + @Override + public SourceReader<SeaTunnelRow, SlsSourceSplit> createReader(SourceReader.Context readContext) + throws Exception { + return new SlsSourceReader(slsSourceConfig, readContext); + } + + @Override + public SourceSplitEnumerator<SlsSourceSplit, SlsSourceState> createEnumerator( + SourceSplitEnumerator.Context<SlsSourceSplit> enumeratorContext) throws Exception { + return new SlsSourceSplitEnumerator(slsSourceConfig, enumeratorContext); + } + + @Override + public SourceSplitEnumerator<SlsSourceSplit, SlsSourceState> restoreEnumerator( + SourceSplitEnumerator.Context<SlsSourceSplit> enumeratorContext, + SlsSourceState checkpointState) + throws Exception { + return new SlsSourceSplitEnumerator(slsSourceConfig, enumeratorContext, checkpointState); + } + + @Override + public List<CatalogTable> getProducedCatalogTables() { + return Lists.newArrayList(slsSourceConfig.getCatalogTable()); + } + + @Override + public String getPluginName() { + return org.apache.seatunnel.connectors.seatunnel.sls.config.Config.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceConfig.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceConfig.java new file mode 100644 index 0000000000..90d709491b --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceConfig.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserialization; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserializationContent; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserializationSchema; +import org.apache.seatunnel.format.text.TextDeserializationSchema; +import org.apache.seatunnel.format.text.constant.TextFormatConstant; + +import lombok.Getter; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.AUTO_CURSOR_RESET; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.CONSUMER_GROUP; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.LOGSTORE; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.START_MODE; + +public class SlsSourceConfig implements Serializable { + private static final long serialVersionUID = 1L; + + @Getter private final String endpoint; + @Getter private final String accessKeyId; + @Getter private final String accessKeySecret; + @Getter private final Long discoveryIntervalMillis; + @Getter private final CatalogTable catalogTable; + @Getter private final ConsumerMetaData consumerMetaData; + + public SlsSourceConfig(ReadonlyConfig readonlyConfig) { + this.endpoint = readonlyConfig.get(ENDPOINT); + this.accessKeyId = readonlyConfig.get(ACCESS_KEY_ID); + this.accessKeySecret = readonlyConfig.get(ACCESS_KEY_SECRET); + this.discoveryIntervalMillis = readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS); + this.catalogTable = createCatalogTable(readonlyConfig); + this.consumerMetaData = createMetaData(readonlyConfig); + } + + /** only single endpoint logstore */ + public ConsumerMetaData createMetaData(ReadonlyConfig readonlyConfig) { + ConsumerMetaData consumerMetaData = new ConsumerMetaData(); + consumerMetaData.setProject(readonlyConfig.get(PROJECT)); + consumerMetaData.setLogstore(readonlyConfig.get(LOGSTORE)); + consumerMetaData.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP)); + consumerMetaData.setStartMode(readonlyConfig.get(START_MODE)); + consumerMetaData.setFetchSize(readonlyConfig.get(BATCH_SIZE)); + consumerMetaData.setAutoCursorReset(readonlyConfig.get(AUTO_CURSOR_RESET)); + consumerMetaData.setDeserializationSchema(createDeserializationSchema(readonlyConfig)); + consumerMetaData.setCatalogTable(catalogTable); + return consumerMetaData; + } + + private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) { + Optional<Map<String, Object>> schemaOptions = + readonlyConfig.getOptional(TableSchemaOptions.SCHEMA); + TablePath tablePath = TablePath.of(readonlyConfig.get(LOGSTORE)); + TableSchema tableSchema; + if (schemaOptions.isPresent()) { + tableSchema = new ReadonlyConfigParser().parse(readonlyConfig); + } else { + // no scheam, all value in content filed + tableSchema = + TableSchema.builder() + .column( + PhysicalColumn.of( + "content", BasicType.STRING_TYPE, 0, false, "{}", null)) + .build(); + } + return CatalogTable.of( + TableIdentifier.of("", tablePath), + tableSchema, + Collections.emptyMap(), + Collections.emptyList(), + null); + } + + private FastLogDeserialization<SeaTunnelRow> createDeserializationSchema( + ReadonlyConfig readonlyConfig) { + Optional<Map<String, Object>> schemaOptions = + readonlyConfig.getOptional(TableSchemaOptions.SCHEMA); + FastLogDeserialization fastLogDeserialization; + if (schemaOptions.isPresent()) { + fastLogDeserialization = new FastLogDeserializationSchema(catalogTable); + + } else { + fastLogDeserialization = new FastLogDeserializationContent(catalogTable); + } + return fastLogDeserialization; + } + + private DeserializationSchema<SeaTunnelRow> createDeserializationSchema( + CatalogTable catalogTable) { + SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + return TextDeserializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .delimiter(TextFormatConstant.PLACEHOLDER) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceFactory.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceFactory.java new file mode 100644 index 0000000000..42cda335fd --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.sls.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +@AutoService(Factory.class) +public class SlsSourceFactory implements TableSourceFactory { + @Override + public Class<? extends SeaTunnelSource> getSourceClass() { + return (Class<? extends SeaTunnelSource>) SlsSource.class; + } + + @Override + public String factoryIdentifier() { + return Config.CONNECTOR_IDENTITY; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + Config.ENDPOINT, + Config.PROJECT, + Config.LOGSTORE, + Config.ACCESS_KEY_ID, + Config.ACCESS_KEY_SECRET) + .optional( + Config.BATCH_SIZE, + Config.START_MODE, + Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, + Config.AUTO_CURSOR_RESET, + Config.CONSUMER_GROUP) + .build(); + } + + @Override + public <T, SplitT extends SourceSplit, StateT extends Serializable> + TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource<T, SplitT, StateT>) new SlsSource(context.getOptions()); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java new file mode 100644 index 0000000000..43cb75328a --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserialization; + +import com.aliyun.openservices.log.common.LogGroupData; +import com.aliyun.openservices.log.exception.LogException; +import com.aliyun.openservices.log.request.PullLogsRequest; +import com.aliyun.openservices.log.response.PullLogsResponse; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +@Slf4j +public class SlsSourceReader implements SourceReader<SeaTunnelRow, SlsSourceSplit> { + private static final long THREAD_WAIT_TIME = 500L; + private final SourceReader.Context context; + private volatile boolean running = false; + private final LinkedBlockingQueue<SlsSourceSplit> pendingShardsQueue; + private final Set<SlsSourceSplit> sourceSplits; + private final Map<String, SlsConsumerThread> consumerThreadMap; + private final SlsSourceConfig slsSourceConfig; + private final ExecutorService executorService; + + private final Map<Long, Map<String, SlsSourceSplit>> checkpointOffsetMap; + + SlsSourceReader(SlsSourceConfig slsSourceConfig, Context context) { + this.pendingShardsQueue = new LinkedBlockingQueue(); + this.sourceSplits = new HashSet<>(); + this.consumerThreadMap = new ConcurrentHashMap<>(); + this.slsSourceConfig = slsSourceConfig; + this.context = context; + this.executorService = + Executors.newCachedThreadPool(r -> new Thread(r, "Sls Source Data Consumer")); + this.checkpointOffsetMap = new ConcurrentHashMap<>(); + } + + @Override + public void open() throws Exception {} + + @Override + public void close() throws IOException { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + @Override + public void pollNext(Collector<SeaTunnelRow> collector) throws Exception { + if (!running) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + + while (!pendingShardsQueue.isEmpty()) { + sourceSplits.add(pendingShardsQueue.poll()); + } + /** thread for Client */ + sourceSplits.forEach( + sourceSplit -> + consumerThreadMap.computeIfAbsent( + sourceSplit.splitId(), + s -> { + SlsConsumerThread thread = + new SlsConsumerThread(slsSourceConfig); + executorService.submit(thread); + return thread; + })); + List<SlsSourceSplit> finishedSplits = new CopyOnWriteArrayList<>(); + FastLogDeserialization fastLogDeserialization = + slsSourceConfig.getConsumerMetaData().getDeserializationSchema(); + sourceSplits.forEach( + sourceSplit -> { + CompletableFuture<Boolean> completableFuture = new CompletableFuture<>(); + try { + consumerThreadMap + .get(sourceSplit.splitId()) + .getTasks() + .put( + consumer -> { + try { + PullLogsRequest request = + new PullLogsRequest( + sourceSplit.getProject(), + sourceSplit.getLogStore(), + sourceSplit.getShardId(), + sourceSplit.getFetchSize(), + sourceSplit.getStartCursor()); + PullLogsResponse response = + consumer.pullLogs(request); + List<LogGroupData> logGroupDatas = + response.getLogGroups(); + fastLogDeserialization.deserialize( + logGroupDatas, collector); + sourceSplit.setStartCursor( + response.getNextCursor()); + completableFuture.complete(true); + } catch (LogException e) { + e.printStackTrace(); + completableFuture.completeExceptionally(e); + throw new RuntimeException(e); + } catch (IOException e) { + e.printStackTrace(); + completableFuture.completeExceptionally(e); + throw new RuntimeException(e); + } + completableFuture.complete(false); + }); + if (completableFuture.get()) { + finishedSplits.add(sourceSplit); + } + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + + // batch mode only for explore data, so do not update cursor + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + for (SlsSourceSplit split : finishedSplits) { + split.setFinish(true); + } + if (sourceSplits.stream().allMatch(SlsSourceSplit::isFinish)) { + log.info("sls batch mode finished"); + context.signalNoMoreElement(); + } + } + } + + @Override + public List<SlsSourceSplit> snapshotState(long checkpointId) throws Exception { + checkpointOffsetMap.put( + checkpointId, + sourceSplits.stream() + .collect(Collectors.toMap(SlsSourceSplit::splitId, SlsSourceSplit::copy))); + return sourceSplits.stream().map(SlsSourceSplit::copy).collect(Collectors.toList()); + } + + // 接受 + @Override + public void addSplits(List<SlsSourceSplit> splits) { + running = true; + splits.forEach( + s -> { + try { + pendingShardsQueue.put(s); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this reader will not add new split."); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!checkpointOffsetMap.containsKey(checkpointId)) { + log.warn("checkpoint {} do not exist or have already been committed.", checkpointId); + } else { + checkpointOffsetMap + .remove(checkpointId) + .forEach( + (sharId, slsSourceSplit) -> { + try { + consumerThreadMap + .get(sharId) + .getTasks() + .put( + client -> { + // now only default onCheckpointCommit + try { + client.UpdateCheckPoint( + slsSourceSplit.getProject(), + slsSourceSplit.getLogStore(), + slsSourceSplit.getConsumer(), + slsSourceSplit.getShardId(), + slsSourceSplit + .getStartCursor()); + } catch (LogException e) { + e.printStackTrace(); + log.error( + "LogException: commit cursor to sls failed", + e); + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + log.error( + "InterruptedException: commit cursor to sls failed", e); + } + }); + } + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplit.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplit.java new file mode 100644 index 0000000000..d5099b9cd1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplit.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +import lombok.Getter; +import lombok.Setter; + +public class SlsSourceSplit implements SourceSplit { + + @Getter private String project; + @Getter private String logStore; + @Getter private String consumer; + @Getter private Integer shardId; + @Getter private String startCursor; + @Getter private Integer fetchSize; + @Setter @Getter private transient volatile boolean finish = false; + + SlsSourceSplit( + String project, + String logStore, + String consumer, + Integer shardId, + String startCursor, + Integer fetchSize) { + this.project = project; + this.logStore = logStore; + this.consumer = consumer; + this.shardId = shardId; + this.startCursor = startCursor; + this.fetchSize = fetchSize; + } + + @Override + public String splitId() { + return String.valueOf(shardId); + } + + public void setStartCursor(String cursor) { + this.startCursor = cursor; + } + + public SlsSourceSplit copy() { + return new SlsSourceSplit( + this.project, + this.logStore, + this.consumer, + this.shardId, + this.startCursor, + this.fetchSize); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java new file mode 100644 index 0000000000..f178d441a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.connectors.seatunnel.sls.config.StartMode; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSourceState; + +import com.aliyun.openservices.log.Client; +import com.aliyun.openservices.log.common.Consts; +import com.aliyun.openservices.log.common.ConsumerGroup; +import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint; +import com.aliyun.openservices.log.exception.LogException; +import com.aliyun.openservices.log.response.ConsumerGroupCheckPointResponse; +import com.aliyun.openservices.log.response.ListConsumerGroupResponse; +import com.aliyun.openservices.log.response.ListShardResponse; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class SlsSourceSplitEnumerator + implements SourceSplitEnumerator<SlsSourceSplit, SlsSourceState> { + + private final Client slsCleint; + private final ConsumerMetaData consumerMetaData; + + private final long discoveryIntervalMillis; + + private final Context<SlsSourceSplit> context; + private final Map<Integer, SlsSourceSplit> pendingSplit; + private final Map<Integer, SlsSourceSplit> assignedSplit; + + private SlsSourceState slsSourceState; + + private ScheduledExecutorService executor; + private ScheduledFuture<?> scheduledFuture; + + public SlsSourceSplitEnumerator( + SlsSourceConfig slsSourceConfig, Context<SlsSourceSplit> context) { + this.context = context; + this.slsCleint = + new Client( + slsSourceConfig.getEndpoint(), + slsSourceConfig.getAccessKeyId(), + slsSourceConfig.getAccessKeySecret()); + this.assignedSplit = new HashMap<>(); + this.pendingSplit = new HashMap<>(); + this.consumerMetaData = slsSourceConfig.getConsumerMetaData(); + this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis(); + } + + public SlsSourceSplitEnumerator( + SlsSourceConfig slsSourceConfig, + Context<SlsSourceSplit> context, + SlsSourceState slsSourceState) { + this.context = context; + this.slsCleint = + new Client( + slsSourceConfig.getEndpoint(), + slsSourceConfig.getAccessKeyId(), + slsSourceConfig.getAccessKeySecret()); + this.assignedSplit = new HashMap<>(); + this.pendingSplit = new HashMap<>(); + this.consumerMetaData = slsSourceConfig.getConsumerMetaData(); + this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis(); + + /** now only from sls cursor for restore */ + this.slsSourceState = slsSourceState; + if (slsSourceState != null) {} + } + + @Override + public void open() { + if (discoveryIntervalMillis > 0) { + this.executor = + Executors.newScheduledThreadPool( + 1, + runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName("sls-shard-dynamic-discovery"); + return thread; + }); + this.scheduledFuture = + executor.scheduleWithFixedDelay( + () -> { + try { + discoverySplits(); + } catch (Exception e) { + log.error("Dynamic discovery failure:", e); + } + }, + discoveryIntervalMillis, + discoveryIntervalMillis, + TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() throws Exception { + fetchPendingShardSplit(); + assignSplit(); + } + + @Override + public void close() throws IOException {} + + @Override + public void addSplitsBack(List<SlsSourceSplit> splits, int subtaskId) { + if (!splits.isEmpty()) { + splits.forEach(split -> pendingSplit.put(split.getShardId(), split)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return 0; + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + if (!pendingSplit.isEmpty()) { + assignSplit(); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + private void discoverySplits() throws LogException { + fetchPendingShardSplit(); + assignSplit(); + } + + private void fetchPendingShardSplit() throws LogException { + String project = this.consumerMetaData.getProject(); + String logStore = this.consumerMetaData.getLogstore(); + String consumer = this.consumerMetaData.getConsumerGroup(); + StartMode startMode = this.consumerMetaData.getStartMode(); + int fetachSize = this.consumerMetaData.getFetchSize(); + Consts.CursorMode autoCursorReset = this.consumerMetaData.getAutoCursorReset(); + ListShardResponse shards = this.slsCleint.ListShard(project, logStore); + shards.GetShards() + .forEach( + shard -> { + if (!assignedSplit.containsKey(shard.getShardId())) { + if (!pendingSplit.containsKey(shard.getShardId())) { + String cursor = ""; + try { + cursor = + initShardCursor( + project, + logStore, + consumer, + shard.getShardId(), + startMode, + autoCursorReset); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (cursor.equals("")) { + throw new RuntimeException("shard cursor error"); + } + SlsSourceSplit split = + new SlsSourceSplit( + project, + logStore, + consumer, + shard.getShardId(), + cursor, + fetachSize); + pendingSplit.put(shard.getShardId(), split); + } + } + }); + } + + private String initShardCursor( + String project, + String logStore, + String consumer, + int shardIdKey, + StartMode cursorMode, + Consts.CursorMode autoCursorReset) + throws Exception { + switch (cursorMode) { + case EARLIEST: + try { + return this.slsCleint + .GetCursor(project, logStore, shardIdKey, Consts.CursorMode.BEGIN) + .GetCursor(); + } catch (LogException e) { + throw new RuntimeException(e); + } + case LATEST: + try { + return this.slsCleint + .GetCursor(project, logStore, shardIdKey, Consts.CursorMode.END) + .GetCursor(); + } catch (LogException e) { + throw new RuntimeException(e); + } + case GROUP_CURSOR: + try { + boolean groupExists = checkConsumerGroupExists(project, logStore, consumer); + if (!groupExists) { + createConsumerGroup(project, logStore, consumer); + } + ConsumerGroupCheckPointResponse response = + this.slsCleint.GetCheckPoint(project, logStore, consumer, shardIdKey); + List<ConsumerGroupShardCheckPoint> checkpoints = response.getCheckPoints(); + if (checkpoints.size() == 1) { + ConsumerGroupShardCheckPoint checkpoint = checkpoints.get(0); + if (!checkpoint.getCheckPoint().equals("")) { + return checkpoint.getCheckPoint(); + } + } + return this.slsCleint + .GetCursor(project, logStore, shardIdKey, autoCursorReset) + .GetCursor(); + } catch (LogException e) { + if (e.GetErrorCode().equals("ConsumerGroupNotExist")) { + return this.slsCleint + .GetCursor(project, logStore, shardIdKey, autoCursorReset) + .GetCursor(); + } + throw new RuntimeException(e); + } + } + throw new RuntimeException( + project + ":" + logStore + ":" + consumer + ":" + cursorMode + ":" + "fail"); + } + + private synchronized void assignSplit() { + Map<Integer, List<SlsSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE); + // init task from Parallelism + for (int taskID = 0; taskID < context.currentParallelism(); taskID++) { + readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); + } + // Determine if split has been assigned + pendingSplit.forEach( + (key, value) -> { + if (!assignedSplit.containsKey(key)) { + readySplit + .get( + getSplitOwner( + value.getShardId(), context.currentParallelism())) + .add(value); + } + }); + // assigned split + readySplit.forEach( + (id, split) -> { + context.assignSplit(id, split); + if (discoveryIntervalMillis <= 0) { + context.signalNoMoreSplits(id); + } + }); + // record assigned split + assignedSplit.putAll(pendingSplit); + pendingSplit.clear(); + } + + private static int getSplitOwner(int shardId, int numReaders) { + return shardId % numReaders; + } + + @Override + public SlsSourceState snapshotState(long checkpointId) throws Exception { + return new SlsSourceState(new HashSet<>(assignedSplit.values())); + } + + public boolean checkConsumerGroupExists(String project, String logstore, String consumerGroup) + throws Exception { + ListConsumerGroupResponse response = this.slsCleint.ListConsumerGroup(project, logstore); + if (response != null) { + for (ConsumerGroup item : response.GetConsumerGroups()) { + if (item.getConsumerGroupName().equals(consumerGroup)) { + return true; + } + } + } + return false; + } + + public void createConsumerGroup( + final String project, final String logstore, final String consumerGroupName) + throws LogException { + ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 100, false); + try { + this.slsCleint.CreateConsumerGroup(project, logstore, consumerGroup); + } catch (LogException ex) { + if ("ConsumerGroupAlreadyExist".equals(ex.GetErrorCode())) {} + + throw ex; + } + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSourceState.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSourceState.java new file mode 100644 index 0000000000..bce5ecb154 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSourceState.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.state; + +import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceSplit; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Set; + +@Data +public class SlsSourceState implements Serializable { + + private Set<SlsSourceSplit> assignedSplit; + + public SlsSourceState(Set<SlsSourceSplit> assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set<SlsSourceSplit> getAssignedSplit() { + return this.assignedSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java new file mode 100644 index 0000000000..1d7c2ab2da --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls; + +import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SlsFactoryTest { + + @Test + void optionRule() { + Assertions.assertNotNull((new SlsSourceFactory()).optionRule()); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index e0564a5572..6bd4065d08 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -79,6 +79,7 @@ <module>connector-web3j</module> <module>connector-milvus</module> <module>connector-activemq</module> + <module>connector-sls</module> </modules> <dependencyManagement> diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a16d86cad5..e919293aab 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -919,6 +919,13 @@ <classifier>optional</classifier> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-sls</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> </dependencies> <build> <finalName>apache-seatunnel-${project.version}</finalName> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/pom.xml new file mode 100644 index 0000000000..fdaeffda91 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-sls-e2e</artifactId> + <name>SeaTunnel : E2E : Connector V2 : Sls</name> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-sls</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java new file mode 100644 index 0000000000..07d368ec8c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.sls; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +@Disabled("Disabled because it needs user's personal sls account to run this test") +public class SlsIT extends TestSuiteBase implements TestResource { + + @BeforeEach + @Override + public void startUp() throws Exception {} + + @AfterEach + @Override + public void tearDown() throws Exception {} + + @TestTemplate + public void testSlsStreamingSource(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult1 = + container.executeJob("/sls_source_with_schema_to_console.conf"); + Assertions.assertEquals(0, execResult1.getExitCode(), execResult1.getStderr()); + Container.ExecResult execResult2 = + container.executeJob("/sls_source_without_schema_to_console.conf"); + Assertions.assertEquals(0, execResult2.getExitCode(), execResult2.getStderr()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_with_schema_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_with_schema_to_console.conf new file mode 100644 index 0000000000..0e9ca29d4a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_with_schema_to_console.conf @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 30000 +} + +source { + Sls { + endpoint = "xxxxxx" + project = "xxxxxx" + logstore = "xxxxxx" + access_key_id = "xxxxxx" + access_key_secret = "xxxxxxx" + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + + + +sink { + Console { + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_without_schema_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_without_schema_to_console.conf new file mode 100644 index 0000000000..0d979b0157 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_source_without_schema_to_console.conf @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 30000 +} + +source { + Sls { + endpoint = "xxxxxx" + project = "xxxxxx" + logstore = "xxxxxx" + access_key_id = "xxxxxx" + access_key_secret = "xxxxxxx" + } +} + + + +sink { + Console { + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 2db67f8814..ed36310474 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -76,6 +76,7 @@ <module>connector-hudi-e2e</module> <module>connector-milvus-e2e</module> <module>connector-activemq-e2e</module> + <module>connector-sls-e2e</module> </modules> <dependencies>