This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bb2ad4048 [Feature][Connector-V2] Add oss sink (#2629)
bb2ad4048 is described below
commit bb2ad404870084599f5e499d3648a2642727965d
Author: TyrantLucifer <[email protected]>
AuthorDate: Mon Sep 5 18:11:44 2022 +0800
[Feature][Connector-V2] Add oss sink (#2629)
* [Feature][Connector-V2] Add oss sink
There are two problems need to be resolved:
1. What the actual difference between AggregatedCommitter and Committer?
2. The separator of file how to distinguish?
* [Feature][Connector-V2] Remove committer
* [Feature][Connector-V2] Refactor oss connector
* [Feature][Connector-V2] Update plugin-mapping.properties
* [Feature][Connector-V2] Update parameters
* [Feature][Connector-V2] Update doc
* [Feature][Connector-V2] Update doc
* [Feature][Connector-V2] Fix code style
* [Improve][Connector-V2] Update oss sink connector doc
---
docs/en/connector-v2/sink/OssFile.md | 217 +++++++++++++++++++++
docs/en/connector-v2/source/OssFile.md | 30 +--
plugin-mapping.properties | 1 +
.../seatunnel/file/sink/BaseFileSink.java | 7 -
.../file/oss/{source => }/config/OssConf.java | 18 +-
.../OssSourceConfig.java => config/OssConfig.java} | 8 +-
.../seatunnel/file/oss/sink/OssFileSink.java | 53 +++++
.../seatunnel/file/oss/source/OssFileSource.java | 27 +--
8 files changed, 317 insertions(+), 44 deletions(-)
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
new file mode 100644
index 000000000..c5a96aae1
--- /dev/null
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -0,0 +1,217 @@
+# OssFile
+
+> Oss file sink connector
+
+## Description
+
+Output data to oss file system.
+
+> Tips: We made some trade-offs in order to support more file types, so we
used the HDFS protocol for internal access to OSS and this connector need some
hadoop dependencies.
+> It's only support hadoop version **2.9.X+**.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+
+## Options
+
+| name | type | required | default value
|
+|----------------------------------| ------
|---------|-----------------------------|
+| path | string | yes | -
|
+| bucket | string | yes | -
|
+| access_key | string | yes | -
|
+| access_secret | string | yes | -
|
+| endpoint | string | yes | -
|
+| file_name_expression | string | no | "${transactionId}"
|
+| file_format | string | no | "text"
|
+| filename_time_format | string | no | "yyyy.MM.dd"
|
+| field_delimiter | string | no | '\001'
|
+| row_delimiter | string | no | "\n"
|
+| partition_by | array | no | -
|
+| partition_dir_expression | string | no |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
+| is_partition_field_write_in_file | boolean| no | false
|
+| sink_columns | array | no | When this parameter is
empty, all fields are sink columns |
+| is_enable_transaction | boolean| no | true
|
+| save_mode | string | no | "error"
|
+
+### path [string]
+
+The target dir path is required.
+
+### bucket [string]
+
+The bucket address of oss file system, for example:
`oss://tyrantlucifer-image-bed`
+
+### access_key [string]
+
+The access key of oss file system.
+
+### access_secret [string]
+
+The access secret of oss file system.
+
+### endpoint [string]
+
+The endpoint of oss file system.
+
+### file_name_expression [string]
+
+`file_name_expression` describes the file expression which will be created
into the `path`. We can add the variable `${now}` or `${uuid}` in the
`file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by
specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add
`${transactionId}_` in the head of the file.
+
+### file_format [string]
+
+We supported as the following file types:
+
+`text` `csv` `parquet` `orc` `json`
+
+Please note that, The final file name will end with the file_format's suffix,
the suffix of the text file is `txt`.
+
+### filename_time_format [string]
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` ,
`filename_time_format` can specify the time format of the path, and the default
value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description |
+| ------ | ------------------ |
+| y | Year |
+| M | Month |
+| d | Day of month |
+| H | Hour in day (0-23) |
+| m | Minute in hour |
+| s | Second in minute |
+
+See [Java
SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)
for detailed time format syntax.
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` and
`csv` file format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` and `csv` file
format.
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### partition_dir_expression [string]
+
+If the `partition_by` is specified, we will generate the corresponding
partition directory based on the partition information, and the final file will
be placed in the partition directory.
+
+Default `partition_dir_expression` is
`${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field
and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the
value of it will be written into data file.
+
+For example, if you want to write a Hive Data File, Its value should be
`false`.
+
+### sink_columns [array]
+
+Which columns need be written to file, default value is all the columns get
from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually
written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost
or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add
`${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, currently supports `overwrite`. This means we will delete the
old file when a new file have a same name with it.
+
+If `is_enable_transaction` is `true`, Basically, we won't encounter the same
file name. Because we will add the transaction id to file name.
+
+For the specific meaning of each mode, see
[save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
+
+## Example
+
+For text file format
+
+```hocon
+
+ OssFile {
+ path="/seatunnel/sink"
+ bucket = "oss://tyrantlucifer-image-bed"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "oss-cn-beijing.aliyuncs.com"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+```
+
+For parquet file format
+
+```hocon
+
+ OssFile {
+ path="/seatunnel/sink"
+ bucket = "oss://tyrantlucifer-image-bed"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxx"
+ endpoint = "oss-cn-beijing.aliyuncs.com"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="parquet"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+```
+
+For orc file format
+
+```bash
+
+ OssFile {
+ path="/seatunnel/sink"
+ bucket = "oss://tyrantlucifer-image-bed"
+ access_key = "xxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxx"
+ endpoint = "oss-cn-beijing.aliyuncs.com"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="orc"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+ }
+
+```
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index b5eda6dd2..7e296bed0 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -27,15 +27,15 @@ Read data from aliyun oss file system.
## Options
-| name | type | required | default value |
-|--------------|--------|----------|---------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| bucket | string | yes | - |
-| accessKey | string | yes | - |
-| accessSecret | string | yes | - |
-| endpoint | string | yes | - |
-| schema | config | no | - |
+| name | type | required | default value |
+|---------------|--------|----------|---------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| bucket | string | yes | - |
+| access_key | string | yes | - |
+| access_secret | string | yes | - |
+| endpoint | string | yes | - |
+| schema | config | no | - |
### path [string]
@@ -93,11 +93,11 @@ Now connector will treat the upstream data as the following:
The bucket address of oss file system, for example:
`oss://tyrantlucifer-image-bed`
-### accessKey [string]
+### access_key [string]
The access key of oss file system.
-### accessSecret [string]
+### access_secret [string]
The access secret of oss file system.
@@ -116,8 +116,8 @@ The schema of upstream data.
OssFile {
path = "/seatunnel/orc"
bucket = "oss://tyrantlucifer-image-bed"
- accessKey = "xxxxxxxxxxxxxxxxx"
- accessSecret = "xxxxxxxxxxxxxxxxxxxxxx"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
type = "orc"
}
@@ -129,8 +129,8 @@ The schema of upstream data.
OssFile {
path = "/seatunnel/json"
bucket = "oss://tyrantlucifer-image-bed"
- accessKey = "xxxxxxxxxxxxxxxxx"
- accessSecret = "xxxxxxxxxxxxxxxxxxxxxx"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
type = "json"
schema {
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7eecae293..ada6b9e45 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -112,6 +112,7 @@ seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.source.LocalFile = connector-file-local
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.OssFile = connector-file-oss
+seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 0faa8d77e..e5bc67ffa 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -23,7 +23,6 @@ import
org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,7 +31,6 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo2;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter2;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkCommitter2;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
@@ -77,11 +75,6 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
}
- @Override
- public Optional<SinkCommitter<FileCommitInfo2>> createCommitter() throws
IOException {
- return Optional.of(new FileSinkCommitter2());
- }
-
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo2,
FileAggregatedCommitInfo2>> createAggregatedCommitter() throws IOException {
return Optional.of(new FileSinkAggregatedCommitter2());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
similarity index 60%
rename from
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
index d197ed17f..96fa483f2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java
@@ -15,10 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.oss.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.hadoop.fs.aliyun.oss.Constants;
+
+import java.util.HashMap;
+
public class OssConf extends HadoopConf {
private final String fsHdfsImpl =
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
@@ -30,4 +36,14 @@ public class OssConf extends HadoopConf {
public OssConf(String hdfsNameKey) {
super(hdfsNameKey);
}
+
+ public static HadoopConf buildWithConfig(Config config) {
+ HadoopConf hadoopConf = new
OssConf(config.getString(OssConfig.BUCKET));
+ HashMap<String, String> ossOptions = new HashMap<>();
+ ossOptions.put(Constants.ACCESS_KEY_ID,
config.getString(OssConfig.ACCESS_KEY));
+ ossOptions.put(Constants.ACCESS_KEY_SECRET,
config.getString(OssConfig.ACCESS_SECRET));
+ ossOptions.put(Constants.ENDPOINT_KEY,
config.getString(OssConfig.ENDPOINT));
+ hadoopConf.setExtraOptions(ossOptions);
+ return hadoopConf;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
similarity index 79%
rename from
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
index fa6728757..7a928e579 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/config/OssSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.oss.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.oss.config;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
-public class OssSourceConfig extends BaseSourceConfig {
- public static final String ACCESS_KEY = "accessKey";
- public static final String ACCESS_SECRET = "accessSecret";
+public class OssConfig extends BaseSourceConfig {
+ public static final String ACCESS_KEY = "access_key";
+ public static final String ACCESS_SECRET = "access_secret";
public static final String ENDPOINT = "endpoint";
public static final String BUCKET = "bucket";
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
new file mode 100644
index 000000000..ff2d16004
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class OssFileSink extends BaseFileSink {
+ @Override
+ public String getPluginName() {
+ return FileSystemType.OSS.getFileSystemPluginName();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ super.prepare(pluginConfig);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+ OssConfig.FILE_PATH,
+ OssConfig.BUCKET, OssConfig.ACCESS_KEY,
+ OssConfig.ACCESS_SECRET, OssConfig.BUCKET);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ }
+ hadoopConf = OssConf.buildWithConfig(pluginConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 1eb588c8e..2008dcc1c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -25,18 +25,16 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
-import
org.apache.seatunnel.connectors.seatunnel.file.oss.source.config.OssConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.oss.source.config.OssSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
-import org.apache.hadoop.fs.aliyun.oss.Constants;
import java.io.IOException;
-import java.util.HashMap;
@AutoService(SeaTunnelSource.class)
public class OssFileSource extends BaseFileSource {
@@ -48,28 +46,23 @@ public class OssFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- OssSourceConfig.FILE_PATH, OssSourceConfig.FILE_TYPE,
- OssSourceConfig.BUCKET, OssSourceConfig.ACCESS_KEY,
- OssSourceConfig.ACCESS_SECRET, OssSourceConfig.BUCKET);
+ OssConfig.FILE_PATH, OssConfig.FILE_TYPE,
+ OssConfig.BUCKET, OssConfig.ACCESS_KEY,
+ OssConfig.ACCESS_SECRET, OssConfig.BUCKET);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
- readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(OssSourceConfig.FILE_TYPE));
- String path = pluginConfig.getString(OssSourceConfig.FILE_PATH);
- hadoopConf = new
OssConf(pluginConfig.getString(OssSourceConfig.BUCKET));
- HashMap<String, String> ossOptions = new HashMap<>();
- ossOptions.put(Constants.ACCESS_KEY_ID,
pluginConfig.getString(OssSourceConfig.ACCESS_KEY));
- ossOptions.put(Constants.ACCESS_KEY_SECRET,
pluginConfig.getString(OssSourceConfig.ACCESS_SECRET));
- ossOptions.put(Constants.ENDPOINT_KEY,
pluginConfig.getString(OssSourceConfig.ENDPOINT));
- hadoopConf.setExtraOptions(ossOptions);
+ readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE));
+ String path = pluginConfig.getString(OssConfig.FILE_PATH);
+ hadoopConf = OssConf.buildWithConfig(pluginConfig);
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
}
// support user-defined schema
- if (pluginConfig.hasPath(OssSourceConfig.SCHEMA)) {
- Config schemaConfig =
pluginConfig.getConfig(OssSourceConfig.SCHEMA);
+ if (pluginConfig.hasPath(OssConfig.SCHEMA)) {
+ Config schemaConfig = pluginConfig.getConfig(OssConfig.SCHEMA);
rowType = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();