This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2eca51f3ba [enhancement](broker) broker load support tencent cos 
(#12801)
2eca51f3ba is described below

commit 2eca51f3ba1fb0ef60c965e10aae35eb6da4caea
Author: xueweizhang <[email protected]>
AuthorDate: Tue Nov 22 21:51:15 2022 +0800

    [enhancement](broker) broker load support tencent cos (#12801)
---
 .../Load/BROKER-LOAD.md                            | 86 +++++++++++++---------
 .../Manipulation/EXPORT.md                         | 15 ++++
 .../Data-Manipulation-Statements/OUTFILE.md        | 22 +++++-
 .../Load/BROKER-LOAD.md                            | 16 ++++
 .../Manipulation/EXPORT.md                         | 15 ++++
 .../Data-Manipulation-Statements/OUTFILE.md        | 22 +++++-
 .../java/org/apache/doris/analysis/ExportStmt.java |  5 +-
 fs_brokers/apache_hdfs_broker/pom.xml              |  6 ++
 .../doris/broker/hdfs/FileSystemManager.java       | 57 ++++++++++++++
 9 files changed, 206 insertions(+), 38 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 6cf381b74f..d2506675d9 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -366,25 +366,25 @@ WITH BROKER broker_name
 
 8. Import a batch of data from HDFS, specify the timeout and filter ratio. 
Broker with clear text my_hdfs_broker. Simple authentication. And delete the 
columns in the original data that match the columns with v2 greater than 100 in 
the imported data, and other columns are imported normally
 
-   ```sql
-   LOAD LABEL example_db.label8
-   (
-       MERGE DATA INFILE("HDFS://test:802/input/file")
-       INTO TABLE `my_table`
-       (k1, k2, k3, v2, v1)
-       DELETE ON v2 > 100
-   )
-   WITH HDFS
-   (
-       "hadoop.username"="user",
-       "password"="pass"
-   )
-   PROPERTIES
-   (
-       "timeout" = "3600",
-       "max_filter_ratio" = "0.1"
-   );
-   ````
+    ```sql
+    LOAD LABEL example_db.label8
+    (
+        MERGE DATA INFILE("HDFS://test:802/input/file")
+        INTO TABLE `my_table`
+        (k1, k2, k3, v2, v1)
+        DELETE ON v2 > 100
+    )
+    WITH HDFS
+    (
+        "hadoop.username"="user",
+        "password"="pass"
+    )
+    PROPERTIES
+    (
+        "timeout" = "3600",
+        "max_filter_ratio" = "0.1"
+    );
+    ````
 
    Import using the MERGE method. `my_table` must be a table with Unique Key. 
When the value of the v2 column in the imported data is greater than 100, the 
row is considered a delete row.
 
@@ -392,21 +392,21 @@ WITH BROKER broker_name
 
 9. Specify the source_sequence column when importing to ensure the replacement 
order in the UNIQUE_KEYS table:
 
-   ```sql
-   LOAD LABEL example_db.label9
-   (
-       DATA INFILE("HDFS://test:802/input/file")
-       INTO TABLE `my_table`
-       COLUMNS TERMINATED BY ","
-       (k1,k2,source_sequence,v1,v2)
-       ORDER BY source_sequence
-   )
-   WITH HDFS
-   (
-       "hadoop.username"="user",
-       "password"="pass"
-   )
-   ````
+    ```sql
+    LOAD LABEL example_db.label9
+    (
+        DATA INFILE("HDFS://test:802/input/file")
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY ","
+        (k1,k2,source_sequence,v1,v2)
+        ORDER BY source_sequence
+    )
+    WITH HDFS
+    (
+        "hadoop.username"="user",
+        "password"="pass"
+    )
+    ````
 
    `my_table` must be an Unqiue Key model table with Sequence Col specified. 
The data will be ordered according to the value of the `source_sequence` column 
in the source data.
 
@@ -459,6 +459,24 @@ WITH BROKER broker_name
     "max_filter_ratio"="0.1"
     );
     ```
+
+11. Load data in csv format from cos(Tencent Cloud Object Storage).
+
+    ```SQL
+    LOAD LABEL example_db.label10
+    (
+    DATA INFILE("cosn://my_bucket/input/file.csv")
+    INTO TABLE `my_table`
+    (k1, k2, k3)
+    )
+    WITH BROKER "broker_name"
+    (
+        "fs.cosn.userinfo.secretId" = "xxx",
+        "fs.cosn.userinfo.secretKey" = "xxxx",
+        "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+    )
+    ```
+
 ### Keywords
 
     BROKER, LOAD
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index c5b92abe23..f337296d0a 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -189,6 +189,21 @@ PROPERTIES (
 )
 ```
 
+9. Export all data in the testTbl table to cos(Tencent Cloud Object Storage).
+
+```sql
+EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
+PROPERTIES (
+  "column_separator"=",",
+  "line_delimiter" = "\n"
+) WITH BROKER "broker_name"
+(
+  "fs.cosn.userinfo.secretId" = "xxx",
+  "fs.cosn.userinfo.secretKey" = "xxxx",
+  "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+)
+```
+
 ### Keywords
 
     EXPORT
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index 4556f683e6..b094fcdd3f 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -187,7 +187,7 @@ illustrate:
    );
    ````
 
-5. Export the query result of the select statement to the file 
`cos://${bucket_name}/path/result.txt`. Specify the export format as csv.
+5. Export the query result of the select statement to the file 
`s3a://${bucket_name}/path/result.txt`. Specify the export format as csv.
    After the export is complete, an identity file is generated.
 
    ```sql
@@ -285,6 +285,26 @@ illustrate:
    If the final generated file is not larger than 100MB, it will be: 
`result_0.csv`.
    If larger than 100MB, it may be `result_0.csv, result_1.csv, ...`.
 
+9. Export the query result of the select statement to the file 
`cosn://${bucket_name}/path/result.txt` on Tencent Cloud Object Storage (COS). 
Specify the export format as csv.
+   After the export is complete, an identity file is generated.
+
+   ```sql
+   select k1,k2,v1 from tbl1 limit 100000
+   into outfile "cosn://my_bucket/export/my_file_"
+   FORMAT AS CSV
+   PROPERTIES
+   (
+       "broker.name" = "broker_name",
+       "broker.fs.cosn.userinfo.secretId" = "xxx",
+       "broker.fs.cosn.userinfo.secretKey" = "xxxx",
+       "broker.fs.cosn.bucket.endpoint_suffix" = 
"https://cos.xxxxxx.myqcloud.com/";,
+       "column_separator" = ",",
+       "line_delimiter" = "\n",
+       "max_file_size" = "1024MB",
+       "success_file_name" = "SUCCESS"
+   )
+   ````
+
 ### keywords
 
 OUTFILE
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 51bab9d6a1..aed99bff02 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -457,6 +457,22 @@ WITH BROKER broker_name
     "timeout"="1200",
     "max_filter_ratio"="0.1"
     );
+
+11. 从腾讯云cos中以csv格式导入数据。
+
+    ```SQL
+    LOAD LABEL example_db.label10
+    (
+    DATA INFILE("cosn://my_bucket/input/file.csv")
+    INTO TABLE `my_table`
+    (k1, k2, k3)
+    )
+    WITH BROKER "broker_name"
+    (
+        "fs.cosn.userinfo.secretId" = "xxx",
+        "fs.cosn.userinfo.secretKey" = "xxxx",
+        "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+    )
     ```
 
 ### Keywords
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
index 7b62a13dac..67495c4571 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md
@@ -188,6 +188,21 @@ PROPERTIES (
 )
 ```
 
+9. 将 testTbl 表中的所有数据导出到 cos(腾讯云) 上。
+
+```sql
+EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
+PROPERTIES (
+  "column_separator"=",",
+  "line_delimiter" = "\n"
+) WITH BROKER "broker_name"
+(
+  "fs.cosn.userinfo.secretId" = "xxx",
+  "fs.cosn.userinfo.secretKey" = "xxxx",
+  "fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
+)
+```
+
 ### Keywords
 
     EXPORT
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index b3db0a349c..bcf9bf635b 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -188,7 +188,7 @@ INTO OUTFILE "file_path"
     );
     ```
     
-5. 将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。
+5. 将 select 语句的查询结果导出到文件 `s3a://${bucket_name}/path/result.txt`。指定导出格式为 csv。
     导出完成后,生成一个标识文件。
     
     ```sql
@@ -287,6 +287,26 @@ INTO OUTFILE "file_path"
     最终生成文件如如果不大于 100MB,则为:`result_0.csv`。
     如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。
 
+9. 将 select 语句的查询结果导出到腾讯云cos的文件 
`cosn://${bucket_name}/path/result.txt`。指定导出格式为 csv。
+    导出完成后,生成一个标识文件。
+
+    ```sql
+    select k1,k2,v1 from tbl1 limit 100000
+    into outfile "cosn://my_bucket/export/my_file_"
+    FORMAT AS CSV
+    PROPERTIES
+    (
+        "broker.name" = "broker_name",
+        "broker.fs.cosn.userinfo.secretId" = "xxx",
+        "broker.fs.cosn.userinfo.secretKey" = "xxxx",
+        "broker.fs.cosn.bucket.endpoint_suffix" = 
"https://cos.xxxxxx.myqcloud.com/";,
+        "column_separator" = ",",
+        "line_delimiter" = "\n",
+        "max_file_size" = "1024MB",
+        "success_file_name" = "SUCCESS"
+    )
+    ```
+
 ### keywords
     SELECT, INTO, OUTFILE
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index e36416fc03..077729c81e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -240,9 +240,10 @@ public class ExportStmt extends StatementBase {
                     && !schema.equalsIgnoreCase("ofs")
                     && !schema.equalsIgnoreCase("obs")
                     && !schema.equalsIgnoreCase("oss")
-                    && !schema.equalsIgnoreCase("s3a"))) {
+                    && !schema.equalsIgnoreCase("s3a")
+                    && !schema.equalsIgnoreCase("cosn"))) {
                 throw new AnalysisException("Invalid broker path. please use 
valid 'hdfs://', 'ofs://', 'obs://',"
-                    + "'oss://'," + " or 's3a://' path.");
+                    + "'oss://', 's3a://' or 'cosn://' path.");
             }
         } else if (type == StorageBackend.StorageType.S3) {
             if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml 
b/fs_brokers/apache_hdfs_broker/pom.xml
index 92361ab6cd..6d1147b8c1 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -292,6 +292,12 @@ under the License.
             <artifactId>hadoop-aliyun</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/com.qcloud.cos/hadoop-cos -->
+        <dependency>
+            <groupId>com.qcloud.cos</groupId>
+            <artifactId>hadoop-cos</artifactId>
+            <version>2.8.5-8.1.8</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>apache_hdfs_broker</finalName>
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 5b1ef33491..42ff97123b 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -70,6 +70,7 @@ public class FileSystemManager {
     private static final String CHDFS_SCHEME = "ofs";
     private static final String OBS_SCHEME = "obs";
     private static final String OSS_SCHEME = "oss";
+    private static final String COS_SCHEME = "cosn";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -124,6 +125,13 @@ public class FileSystemManager {
     private static final String FS_OSS_IMPL_DISABLE_CACHE = 
"fs.oss.impl.disable.cache";
     private static final String FS_OSS_IMPL = "fs.oss.impl";
 
+    // arguments for cos
+    private static final String FS_COS_ACCESS_KEY = 
"fs.cosn.userinfo.secretId";
+    private static final String FS_COS_SECRET_KEY = 
"fs.cosn.userinfo.secretKey";
+    private static final String FS_COS_ENDPOINT = 
"fs.cosn.bucket.endpoint_suffix";
+    private static final String FS_COS_IMPL = "fs.cosn.impl";
+    private static final String FS_COS_IMPL_DISABLE_CACHE = 
"fs.cosn.impl.disable.cache";
+
     private ScheduledExecutorService handleManagementPool = 
Executors.newScheduledThreadPool(2);
 
     private int readBufferSize = 128 << 10; // 128k
@@ -187,6 +195,8 @@ public class FileSystemManager {
             brokerFileSystem = getOBSFileSystem(path, properties);
         } else if (scheme.equals(OSS_SCHEME)) {
             brokerFileSystem = getOSSFileSystem(path, properties);
+        } else if (scheme.equals(COS_SCHEME)) {
+            brokerFileSystem = getCOSFileSystem(path, properties);
         } else {
             throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
@@ -681,6 +691,53 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     * <p>
+     * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
+     *
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getCOSFileSystem(String path, Map<String, String> 
properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String accessKey = properties.getOrDefault(FS_COS_ACCESS_KEY, "");
+        String secretKey = properties.getOrDefault(FS_COS_SECRET_KEY, "");
+        String endpoint = properties.getOrDefault(FS_COS_ENDPOINT, "");
+        String disableCache = 
properties.getOrDefault(FS_COS_IMPL_DISABLE_CACHE, "true");
+        // endpoint is the server host, pathUri.getUri().getHost() is the 
bucket
+        // we should use these two params as the host identity, because 
FileSystem will cache both.
+        String host = COS_SCHEME + "://" + endpoint + "/" + 
pathUri.getUri().getHost();
+        String cosUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
cosUgi);
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
+        try {
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("could not find file system for path " + path + " 
create a new one");
+                // create a new filesystem
+                Configuration conf = new Configuration();
+                conf.set(FS_COS_ACCESS_KEY, accessKey);
+                conf.set(FS_COS_SECRET_KEY, secretKey);
+                conf.set(FS_COS_ENDPOINT, endpoint);
+                conf.set(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+                conf.set(FS_COS_IMPL_DISABLE_CACHE, disableCache);
+                FileSystem cosFileSystem = FileSystem.get(pathUri.getUri(), 
conf);
+                fileSystem.setFileSystem(cosFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, 
Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to