This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb2ad4048 [Feature][Connector-V2] Add oss sink (#2629)
bb2ad4048 is described below

commit bb2ad404870084599f5e499d3648a2642727965d
Author: TyrantLucifer <[email protected]>
AuthorDate: Mon Sep 5 18:11:44 2022 +0800

    [Feature][Connector-V2] Add oss sink (#2629)
    
    * [Feature][Connector-V2] Add oss sink
    
    There are two problems need to be resolved:
    1. What the actual difference between AggregatedCommitter and Committer?
    2. The separator of file how to distinguish?
    
    * [Feature][Connector-V2] Remove committer
    
    * [Feature][Connector-V2] Refactor oss connector
    
    * [Feature][Connector-V2] Update plugin-mapping.properties
    
    * [Feature][Connector-V2] Update parameters
    
    * [Feature][Connector-V2] Update doc
    
    * [Feature][Connector-V2] Update doc
    
    * [Feature][Connector-V2] Fix code style
    
    * [Improve][Connector-V2] Update oss sink connector doc
---
 docs/en/connector-v2/sink/OssFile.md               | 217 +++++++++++++++++++++
 docs/en/connector-v2/source/OssFile.md             |  30 +--
 plugin-mapping.properties                          |   1 +
 .../seatunnel/file/sink/BaseFileSink.java          |   7 -
 .../file/oss/{source => }/config/OssConf.java      |  18 +-
 .../OssSourceConfig.java => config/OssConfig.java} |   8 +-
 .../seatunnel/file/oss/sink/OssFileSink.java       |  53 +++++
 .../seatunnel/file/oss/source/OssFileSource.java   |  27 +--
 8 files changed, 317 insertions(+), 44 deletions(-)

diff --git a/docs/en/connector-v2/sink/OssFile.md 
b/docs/en/connector-v2/sink/OssFile.md
new file mode 100644
index 000000000..c5a96aae1
--- /dev/null
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -0,0 +1,217 @@
+# OssFile
+
+> Oss file sink connector
+
+## Description
+
+Output data to oss file system.
+
+> Tips: We made some trade-offs in order to support more file types, so we 
used the HDFS protocol for internal access to OSS and this connector need some 
hadoop dependencies.
+> It's only support hadoop version **2.9.X+**.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+    - [x] text
+    - [x] csv
+    - [x] parquet
+    - [x] orc
+    - [x] json
+
+## Options
+
+| name                             | type   | required | default value         
      |
+|----------------------------------| ------ 
|---------|-----------------------------|
+| path                             | string | yes     | -                      
     |
+| bucket                           | string | yes     | -                      
     |
+| access_key                       | string | yes     | -                      
     |
+| access_secret                    | string | yes     | -                      
     |
+| endpoint                         | string | yes     | -                      
     |
+| file_name_expression             | string | no      | "${transactionId}"     
     |
+| file_format                      | string | no      | "text"                 
     |
+| filename_time_format             | string | no      | "yyyy.MM.dd"           
     |
+| field_delimiter                  | string | no      | '\001'                 
     |
+| row_delimiter                    | string | no      | "\n"                   
     |
+| partition_by                     | array  | no      | -                      
     |
+| partition_dir_expression         | string | no      | 
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
+| is_partition_field_write_in_file | boolean| no      | false                  
     |
+| sink_columns                     | array  | no      | When this parameter is 
empty, all fields are sink columns |
+| is_enable_transaction            | boolean| no      | true                   
     |
+| save_mode                        | string | no      | "error"                
     |
+
+### path [string]
+
+The target dir path is required.
+
+### bucket [string]
+
+The bucket address of oss file system, for example: 
`oss://tyrantlucifer-image-bed`
+
+### access_key [string]
+
+The access key of oss file system.
+
+### access_secret [string]
+
+The access secret of oss file system.
+
+### endpoint [string]
+
+The endpoint of oss file system.
+
+### file_name_expression [string]
+
+`file_name_expression` describes the file expression which will be created 
into the `path`. We can add the variable `${now}` or `${uuid}` in the 
`file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by 
specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add 
`${transactionId}_` in the head of the file.
+
+### file_format [string]
+
+We supported as the following file types:
+
+`text` `csv` `parquet` `orc` `json`
+
+Please note that, The final file name will end with the file_format's suffix, 
the suffix of the text file is `txt`.
+
+### filename_time_format [string]
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` , 
`filename_time_format` can specify the time format of the path, and the default 
value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description        |
+| ------ | ------------------ |
+| y      | Year               |
+| M      | Month              |
+| d      | Day of month       |
+| H      | Hour in day (0-23) |
+| m      | Minute in hour     |
+| s      | Second in minute   |
+
+See [Java 
SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)
 for detailed time format syntax.
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` and 
`csv` file format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` and `csv` file 
format.
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### partition_dir_expression [string]
+
+If the `partition_by` is specified, we will generate the corresponding 
partition directory based on the partition information, and the final file will 
be placed in the partition directory.
+
+Default `partition_dir_expression` is 
`${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field 
and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the 
value of it will be written into data file.
+
+For example, if you want to write a Hive Data File, Its value should be 
`false`.
+
+### sink_columns [array]
+
+Which columns need be written to file, default value is all the columns get 
from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually 
written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost 
or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add 
`${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, currently supports `overwrite`. This means we will delete the 
old file when a new file have a same name with it.
+
+If `is_enable_transaction` is `true`, Basically, we won't encounter the same 
file name. Because we will add the transaction id to file name.
+
+For the specific meaning of each mode, see 
[save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
+
+## Example
+
+For text file format
+
+```hocon
+
+  OssFile {
+    path="/seatunnel/sink"
+    bucket = "oss://tyrantlucifer-image-bed"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxx"
+    endpoint = "oss-cn-beijing.aliyuncs.com"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+```
+
+For parquet file format
+
+```hocon
+
+  OssFile {
+    path="/seatunnel/sink"
+    bucket = "oss://tyrantlucifer-image-bed"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxx"
+    endpoint = "oss-cn-beijing.aliyuncs.com"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="parquet"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+```
+
+For orc file format
+
+```bash
+
+  OssFile {
+    path="/seatunnel/sink"
+    bucket = "oss://tyrantlucifer-image-bed"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxx"
+    endpoint = "oss-cn-beijing.aliyuncs.com"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="orc"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+  }
+
+```
diff --git a/docs/en/connector-v2/source/OssFile.md 
b/docs/en/connector-v2/source/OssFile.md
index b5eda6dd2..7e296bed0 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -27,15 +27,15 @@ Read data from aliyun oss file system.
 
 ## Options
 
-| name         | type   | required | default value |
-|--------------|--------|----------|---------------|
-| path         | string | yes      | -             |
-| type         | string | yes      | -             |
-| bucket       | string | yes      | -             |
-| accessKey    | string | yes      | -             |
-| accessSecret | string | yes      | -             |
-| endpoint     | string | yes      | -             |
-| schema       | config | no       | -             |
+| name          | type   | required | default value |
+|---------------|--------|----------|---------------|
+| path          | string | yes      | -             |
+| type          | string | yes      | -             |
+| bucket        | string | yes      | -             |
+| access_key    | string | yes      | -             |
+| access_secret | string | yes      | -             |
+| endpoint      | string | yes      | -             |
+| schema        | config | no       | -             |
 
 ### path [string]
 
@@ -93,11 +93,11 @@ Now connector will treat the upstream data as the following:
 
 The bucket address of oss file system, for example: 
`oss://tyrantlucifer-image-bed`
 
-### accessKey [string]
+### access_key [string]
 
 The access key of oss file system.
 
-### accessSecret [string]
+### access_secret [string]
 
 The access secret of oss file system.
 
@@ -116,8 +116,8 @@ The schema of upstream data.
   OssFile {
     path = "/seatunnel/orc"
     bucket = "oss://tyrantlucifer-image-bed"
-    accessKey = "xxxxxxxxxxxxxxxxx"
-    accessSecret = "xxxxxxxxxxxxxxxxxxxxxx"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
     type = "orc"
   }
@@ -129,8 +129,8 @@ The schema of upstream data.
   OssFile {
     path = "/seatunnel/json"
     bucket = "oss://tyrantlucifer-image-bed"
-    accessKey = "xxxxxxxxxxxxxxxxx"
-    accessSecret = "xxxxxxxxxxxxxxxxxxxxxx"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
     type = "json"
     schema {
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7eecae293..ada6b9e45 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -112,6 +112,7 @@ seatunnel.sink.HdfsFile = connector-file-hadoop
 seatunnel.source.LocalFile = connector-file-local
 seatunnel.sink.LocalFile = connector-file-local
 seatunnel.source.OssFile = connector-file-oss
+seatunnel.sink.OssFile = connector-file-oss
 seatunnel.source.Pulsar = connector-pulsar
 seatunnel.source.Hudi = connector-hudi
 seatunnel.sink.DingTalk = connector-dingtalk
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 0faa8d77e..e5bc67ffa 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -23,7 +23,6 @@ import 
org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,7 +31,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo2;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter2;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkCommitter2;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
@@ -77,11 +75,6 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
         return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
     }
 
-    @Override
-    public Optional<SinkCommitter<FileCommitInfo2>> createCommitter() throws 
IOException {
-        return Optional.of(new FileSinkCommitter2());
-    }
-
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo2, 
FileAggregatedCommitInfo2>> createAggregatedCommitter() throws IOException {
         return Optional.of(new FileSinkAggregatedCommitter2());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
similarity index 60%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
index d197ed17f..96fa483f2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
@@ -15,10 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.oss.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
 
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.hadoop.fs.aliyun.oss.Constants;
+
+import java.util.HashMap;
+
 public class OssConf extends HadoopConf {
     private final String fsHdfsImpl = 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
 
@@ -30,4 +36,14 @@ public class OssConf extends HadoopConf {
     public OssConf(String hdfsNameKey) {
         super(hdfsNameKey);
     }
+
+    public static HadoopConf buildWithConfig(Config config) {
+        HadoopConf hadoopConf = new 
OssConf(config.getString(OssConfig.BUCKET));
+        HashMap<String, String> ossOptions = new HashMap<>();
+        ossOptions.put(Constants.ACCESS_KEY_ID, 
config.getString(OssConfig.ACCESS_KEY));
+        ossOptions.put(Constants.ACCESS_KEY_SECRET, 
config.getString(OssConfig.ACCESS_SECRET));
+        ossOptions.put(Constants.ENDPOINT_KEY, 
config.getString(OssConfig.ENDPOINT));
+        hadoopConf.setExtraOptions(ossOptions);
+        return hadoopConf;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
similarity index 79%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
index fa6728757..7a928e579 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.oss.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
 
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 
-public class OssSourceConfig extends BaseSourceConfig {
-    public static final String ACCESS_KEY = "accessKey";
-    public static final String ACCESS_SECRET = "accessSecret";
+public class OssConfig extends BaseSourceConfig {
+    public static final String ACCESS_KEY = "access_key";
+    public static final String ACCESS_SECRET = "access_secret";
     public static final String ENDPOINT = "endpoint";
     public static final String BUCKET = "bucket";
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
new file mode 100644
index 000000000..ff2d16004
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class OssFileSink extends BaseFileSink {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.OSS.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        super.prepare(pluginConfig);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                OssConfig.FILE_PATH,
+                OssConfig.BUCKET, OssConfig.ACCESS_KEY,
+                OssConfig.ACCESS_SECRET, OssConfig.BUCKET);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+        }
+        hadoopConf = OssConf.buildWithConfig(pluginConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 1eb588c8e..2008dcc1c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -25,18 +25,16 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
-import 
org.apache.seatunnel.connectors.seatunnel.file.oss.source.config.OssConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.oss.source.config.OssSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
-import org.apache.hadoop.fs.aliyun.oss.Constants;
 
 import java.io.IOException;
-import java.util.HashMap;
 
 @AutoService(SeaTunnelSource.class)
 public class OssFileSource extends BaseFileSource {
@@ -48,28 +46,23 @@ public class OssFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                OssSourceConfig.FILE_PATH, OssSourceConfig.FILE_TYPE,
-                OssSourceConfig.BUCKET, OssSourceConfig.ACCESS_KEY,
-                OssSourceConfig.ACCESS_SECRET, OssSourceConfig.BUCKET);
+                OssConfig.FILE_PATH, OssConfig.FILE_TYPE,
+                OssConfig.BUCKET, OssConfig.ACCESS_KEY,
+                OssConfig.ACCESS_SECRET, OssConfig.BUCKET);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
-        readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(OssSourceConfig.FILE_TYPE));
-        String path = pluginConfig.getString(OssSourceConfig.FILE_PATH);
-        hadoopConf = new 
OssConf(pluginConfig.getString(OssSourceConfig.BUCKET));
-        HashMap<String, String> ossOptions = new HashMap<>();
-        ossOptions.put(Constants.ACCESS_KEY_ID, 
pluginConfig.getString(OssSourceConfig.ACCESS_KEY));
-        ossOptions.put(Constants.ACCESS_KEY_SECRET, 
pluginConfig.getString(OssSourceConfig.ACCESS_SECRET));
-        ossOptions.put(Constants.ENDPOINT_KEY, 
pluginConfig.getString(OssSourceConfig.ENDPOINT));
-        hadoopConf.setExtraOptions(ossOptions);
+        readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE));
+        String path = pluginConfig.getString(OssConfig.FILE_PATH);
+        hadoopConf = OssConf.buildWithConfig(pluginConfig);
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
         }
         // support user-defined schema
-        if (pluginConfig.hasPath(OssSourceConfig.SCHEMA)) {
-            Config schemaConfig = 
pluginConfig.getConfig(OssSourceConfig.SCHEMA);
+        if (pluginConfig.hasPath(OssConfig.SCHEMA)) {
+            Config schemaConfig = pluginConfig.getConfig(OssConfig.SCHEMA);
             rowType = SeaTunnelSchema
                     .buildWithConfig(schemaConfig)
                     .getSeaTunnelRowType();

Reply via email to