This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2eca51f3ba [enhancement](broker) broker load support tencent cos
(#12801)
2eca51f3ba is described below
commit 2eca51f3ba1fb0ef60c965e10aae35eb6da4caea
Author: xueweizhang <[email protected]>
AuthorDate: Tue Nov 22 21:51:15 2022 +0800
[enhancement](broker) broker load support tencent cos (#12801)
---
.../Load/BROKER-LOAD.md | 86 +++++++++++++---------
.../Manipulation/EXPORT.md | 15 ++++
.../Data-Manipulation-Statements/OUTFILE.md | 22 +++++-
.../Load/BROKER-LOAD.md | 16 ++++
.../Manipulation/EXPORT.md | 15 ++++
.../Data-Manipulation-Statements/OUTFILE.md | 22 +++++-
.../java/org/apache/doris/analysis/ExportStmt.java | 5 +-
fs_brokers/apache_hdfs_broker/pom.xml | 6 ++
.../doris/broker/hdfs/FileSystemManager.java | 57 ++++++++++++++
9 files changed, 206 insertions(+), 38 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 6cf381b74f..d2506675d9 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -366,25 +366,25 @@ WITH BROKER broker_name
8. Import a batch of data from HDFS, specify the timeout and filter ratio.
Broker with clear text my_hdfs_broker. Simple authentication. And delete the
columns in the original data that match the columns with v2 greater than 100 in
the imported data, and other columns are imported normally
- ```sql
- LOAD LABEL example_db.label8
- (
- MERGE DATA INFILE("HDFS://test:802/input/file")
- INTO TABLE `my_table`
- (k1, k2, k3, v2, v1)
- DELETE ON v2 > 100
- )
- WITH HDFS
- (
- "hadoop.username"="user",
- "password"="pass"
- )
- PROPERTIES
- (
- "timeout" = "3600",
- "max_filter_ratio" = "0.1"
- );
- ````
+ ```sql
+ LOAD LABEL example_db.label8
+ (
+ MERGE DATA INFILE("HDFS://test:802/input/file")
+ INTO TABLE `my_table`
+ (k1, k2, k3, v2, v1)
+ DELETE ON v2 > 100
+ )
+ WITH HDFS
+ (
+ "hadoop.username"="user",
+ "password"="pass"
+ )
+ PROPERTIES
+ (
+ "timeout" = "3600",
+ "max_filter_ratio" = "0.1"
+ );
+ ````
Import using the MERGE method. `my_table` must be a table with Unique Key.
When the value of the v2 column in the imported data is greater than 100, the
row is considered a delete row.
@@ -392,21 +392,21 @@ WITH BROKER broker_name
9. Specify the source_sequence column when importing to ensure the replacement
order in the UNIQUE_KEYS table:
- ```sql
- LOAD LABEL example_db.label9
- (
- DATA INFILE("HDFS://test:802/input/file")
- INTO TABLE `my_table`
- COLUMNS TERMINATED BY ","
- (k1,k2,source_sequence,v1,v2)
- ORDER BY source_sequence
- )
- WITH HDFS
- (
- "hadoop.username"="user",
- "password"="pass"
- )
- ````
+ ```sql
+ LOAD LABEL example_db.label9
+ (
+ DATA INFILE("HDFS://test:802/input/file")
+ INTO TABLE `my_table`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,source_sequence,v1,v2)
+ ORDER BY source_sequence
+ )
+ WITH HDFS
+ (
+ "hadoop.username"="user",
+ "password"="pass"
+ )
+ ````
`my_table` must be an Unqiue Key model table with Sequence Col specified.
The data will be ordered according to the value of the `source_sequence` column
in the source data.
@@ -459,6 +459,24 @@ WITH BROKER broker_name
"max_filter_ratio"="0.1"
);
```
+
+11. Load data in csv format from cos(Tencent Cloud Object Storage).
+
+ ```SQL
+ LOAD LABEL example_db.label10
+ (
+ DATA INFILE("cosn://my_bucket/input/file.csv")
+ INTO TABLE `my_table`
+ (k1, k2, k3)
+ )
+ WITH BROKER "broker_name"
+ (
+ "fs.cosn.userinfo.secretId" = "xxx",
+ "fs.cosn.userinfo.secretKey" = "xxxx",
+ "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+ )
+ ```
+
### Keywords
BROKER, LOAD
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index c5b92abe23..f337296d0a 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -189,6 +189,21 @@ PROPERTIES (
)
```
+9. Export all data in the testTbl table to cos(Tencent Cloud Object Storage).
+
+```sql
+EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
+PROPERTIES (
+ "column_separator"=",",
+ "line_delimiter" = "\n"
+) WITH BROKER "broker_name"
+(
+ "fs.cosn.userinfo.secretId" = "xxx",
+ "fs.cosn.userinfo.secretKey" = "xxxx",
+ "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+)
+```
+
### Keywords
EXPORT
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index 4556f683e6..b094fcdd3f 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -187,7 +187,7 @@ illustrate:
);
````
-5. Export the query result of the select statement to the file
`cos://${bucket_name}/path/result.txt`. Specify the export format as csv.
+5. Export the query result of the select statement to the file
`s3a://${bucket_name}/path/result.txt`. Specify the export format as csv.
After the export is complete, an identity file is generated.
```sql
@@ -285,6 +285,26 @@ illustrate:
If the final generated file is not larger than 100MB, it will be:
`result_0.csv`.
If larger than 100MB, it may be `result_0.csv, result_1.csv, ...`.
+9. Export the query result of the select statement to the file
`cosn://${bucket_name}/path/result.txt` on Tencent Cloud Object Storage (COS).
Specify the export format as csv.
+ After the export is complete, an identity file is generated.
+
+ ```sql
+ select k1,k2,v1 from tbl1 limit 100000
+ into outfile "cosn://my_bucket/export/my_file_"
+ FORMAT AS CSV
+ PROPERTIES
+ (
+ "broker.name" = "broker_name",
+ "broker.fs.cosn.userinfo.secretId" = "xxx",
+ "broker.fs.cosn.userinfo.secretKey" = "xxxx",
+ "broker.fs.cosn.bucket.endpoint_suffix" =
"https://cos.xxxxxx.myqcloud.com/",
+ "column_separator" = ",",
+ "line_delimiter" = "\n",
+ "max_file_size" = "1024MB",
+ "success_file_name" = "SUCCESS"
+ )
+ ````
+
### keywords
OUTFILE
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 51bab9d6a1..aed99bff02 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -457,6 +457,22 @@ WITH BROKER broker_name
"timeout"="1200",
"max_filter_ratio"="0.1"
);
+
+11. 从腾讯云cos中以csv格式导入数据。
+
+ ```SQL
+ LOAD LABEL example_db.label10
+ (
+ DATA INFILE("cosn://my_bucket/input/file.csv")
+ INTO TABLE `my_table`
+ (k1, k2, k3)
+ )
+ WITH BROKER "broker_name"
+ (
+ "fs.cosn.userinfo.secretId" = "xxx",
+ "fs.cosn.userinfo.secretKey" = "xxxx",
+ "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+ )
```
### Keywords
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index 7b62a13dac..67495c4571 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -188,6 +188,21 @@ PROPERTIES (
)
```
+9. 将 testTbl 表中的所有数据导出到 cos(腾讯云) 上。
+
+```sql
+EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
+PROPERTIES (
+ "column_separator"=",",
+ "line_delimiter" = "\n"
+) WITH BROKER "broker_name"
+(
+ "fs.cosn.userinfo.secretId" = "xxx",
+ "fs.cosn.userinfo.secretKey" = "xxxx",
+ "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+)
+```
+
### Keywords
EXPORT
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index b3db0a349c..bcf9bf635b 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -188,7 +188,7 @@ INTO OUTFILE "file_path"
);
```
-5. 将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。
+5. 将 select 语句的查询结果导出到文件 `s3a://${bucket_name}/path/result.txt`。指定导出格式为 csv。
导出完成后,生成一个标识文件。
```sql
@@ -287,6 +287,26 @@ INTO OUTFILE "file_path"
最终生成文件如如果不大于 100MB,则为:`result_0.csv`。
如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。
+9. 将 select 语句的查询结果导出到腾讯云cos的文件
`cosn://${bucket_name}/path/result.txt`。指定导出格式为 csv。
+ 导出完成后,生成一个标识文件。
+
+ ```sql
+ select k1,k2,v1 from tbl1 limit 100000
+ into outfile "cosn://my_bucket/export/my_file_"
+ FORMAT AS CSV
+ PROPERTIES
+ (
+ "broker.name" = "broker_name",
+ "broker.fs.cosn.userinfo.secretId" = "xxx",
+ "broker.fs.cosn.userinfo.secretKey" = "xxxx",
+ "broker.fs.cosn.bucket.endpoint_suffix" =
"https://cos.xxxxxx.myqcloud.com/",
+ "column_separator" = ",",
+ "line_delimiter" = "\n",
+ "max_file_size" = "1024MB",
+ "success_file_name" = "SUCCESS"
+ )
+ ```
+
### keywords
SELECT, INTO, OUTFILE
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index e36416fc03..077729c81e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -240,9 +240,10 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
- && !schema.equalsIgnoreCase("s3a"))) {
+ && !schema.equalsIgnoreCase("s3a")
+ && !schema.equalsIgnoreCase("cosn"))) {
throw new AnalysisException("Invalid broker path. please use
valid 'hdfs://', 'ofs://', 'obs://',"
- + "'oss://'," + " or 's3a://' path.");
+ + "'oss://', 's3a://' or 'cosn://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml
b/fs_brokers/apache_hdfs_broker/pom.xml
index 92361ab6cd..6d1147b8c1 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -292,6 +292,12 @@ under the License.
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/com.qcloud.cos/hadoop-cos -->
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>hadoop-cos</artifactId>
+ <version>2.8.5-8.1.8</version>
+ </dependency>
</dependencies>
<build>
<finalName>apache_hdfs_broker</finalName>
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 5b1ef33491..42ff97123b 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -70,6 +70,7 @@ public class FileSystemManager {
private static final String CHDFS_SCHEME = "ofs";
private static final String OBS_SCHEME = "obs";
private static final String OSS_SCHEME = "oss";
+ private static final String COS_SCHEME = "cosn";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -124,6 +125,13 @@ public class FileSystemManager {
private static final String FS_OSS_IMPL_DISABLE_CACHE =
"fs.oss.impl.disable.cache";
private static final String FS_OSS_IMPL = "fs.oss.impl";
+ // arguments for cos
+ private static final String FS_COS_ACCESS_KEY =
"fs.cosn.userinfo.secretId";
+ private static final String FS_COS_SECRET_KEY =
"fs.cosn.userinfo.secretKey";
+ private static final String FS_COS_ENDPOINT =
"fs.cosn.bucket.endpoint_suffix";
+ private static final String FS_COS_IMPL = "fs.cosn.impl";
+ private static final String FS_COS_IMPL_DISABLE_CACHE =
"fs.cosn.impl.disable.cache";
+
private ScheduledExecutorService handleManagementPool =
Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -187,6 +195,8 @@ public class FileSystemManager {
brokerFileSystem = getOBSFileSystem(path, properties);
} else if (scheme.equals(OSS_SCHEME)) {
brokerFileSystem = getOSSFileSystem(path, properties);
+ } else if (scheme.equals(COS_SCHEME)) {
+ brokerFileSystem = getCOSFileSystem(path, properties);
} else {
throw new
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@@ -681,6 +691,53 @@ public class FileSystemManager {
}
}
+ /**
+ * visible for test
+ * <p>
+ * file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
+ *
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getCOSFileSystem(String path, Map<String, String>
properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_COS_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_COS_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_COS_ENDPOINT, "");
+ String disableCache =
properties.getOrDefault(FS_COS_IMPL_DISABLE_CACHE, "true");
+ // endpoint is the server host, pathUri.getUri().getHost() is the
bucket
+ // we should use these two params as the host identity, because
FileSystem will cache both.
+ String host = COS_SCHEME + "://" + endpoint + "/" +
pathUri.getUri().getHost();
+ String cosUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
cosUgi);
+ BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + "
create a new one");
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_COS_ACCESS_KEY, accessKey);
+ conf.set(FS_COS_SECRET_KEY, secretKey);
+ conf.set(FS_COS_ENDPOINT, endpoint);
+ conf.set(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+ conf.set(FS_COS_IMPL_DISABLE_CACHE, disableCache);
+ FileSystem cosFileSystem = FileSystem.get(pathUri.getUri(),
conf);
+ fileSystem.setFileSystem(cosFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly,
Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]