This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5bec4132ffbec0d038bfc40b47803d07b554107b
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Mon Nov 1 10:48:10 2021 +0800

    [S3] Support path style endpoint (#6962)
    
    Add a use_path_style property for S3
    Upgrade hadoop-common and hadoop-aws to 2.8.0 to support path style property
    Fix some S3 URI bugs
    Add some logs for tracing load process.
---
 be/src/exec/tablet_sink.cpp                        |  6 +-
 be/src/olap/delta_writer.cpp                       |  4 +-
 be/src/runtime/load_channel.cpp                    |  3 +-
 be/src/runtime/tablets_channel.cpp                 |  5 +-
 be/src/runtime/tablets_channel.h                   |  2 +-
 be/src/service/internal_service.cpp                |  3 +-
 be/src/util/s3_util.cpp                            | 11 ++-
 be/test/util/s3_storage_backend_test.cpp           | 17 +++--
 .../load-data/s3-load-manual.md                    | 21 +++++-
 .../load-data/s3-load-manual.md                    | 21 +++++-
 fe/fe-core/pom.xml                                 |  2 +-
 .../java/org/apache/doris/backup/BlobStorage.java  |  1 -
 .../java/org/apache/doris/backup/Repository.java   | 14 ++--
 .../java/org/apache/doris/backup/S3Storage.java    | 59 +++++++++++-----
 .../org/apache/doris/common/util/BrokerUtil.java   | 15 +++--
 .../java/org/apache/doris/common/util/S3URI.java   | 68 +++++++++++++++----
 .../doris/httpv2/rest/manager/NodeAction.java      | 10 +--
 .../org/apache/doris/backup/S3StorageTest.java     | 10 +--
 .../org/apache/doris/common/util/S3URITest.java    | 38 ++++++-----
 .../doris/planner/StreamLoadScanNodeTest.java      | 78 ++++++++++++++++++++--
 fe/pom.xml                                         |  2 +-
 gensrc/proto/internal_service.proto                |  2 +
 22 files changed, 288 insertions(+), 104 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index c5c02d2..b818918 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -86,6 +86,7 @@ Status NodeChannel::init(RuntimeState* state) {
     _cur_add_batch_request.set_allocated_id(&_parent->_load_id);
     _cur_add_batch_request.set_index_id(_index_id);
     _cur_add_batch_request.set_sender_id(_parent->_sender_id);
+    _cur_add_batch_request.set_backend_id(_node_id);
     _cur_add_batch_request.set_eos(false);
 
     _rpc_timeout_ms = state->query_options().query_timeout * 1000;
@@ -93,7 +94,7 @@ Status NodeChannel::init(RuntimeState* state) {
 
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
-    _name = "NodeChannel[" + std::to_string(_index_id) + "-" + 
std::to_string(_node_id) + "]";
+    _name = fmt::format("NodeChannel[{}-{}]", _index_id, _node_id);
     return Status::OK();
 }
 
@@ -282,7 +283,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
         SleepFor(MonoDelta::FromMilliseconds(1));
     }
     timer.stop();
-    VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 
1000000 << " ms";
+    VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 
1000000 << " ms"
+                  << ", " << _load_info;
 
     if (_add_batches_finished) {
         {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3868e88..afa014b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -321,7 +321,9 @@ OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
-    VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() 
<< ", stats: " << stat;
+    VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() 
+                  << ", load id: " << print_id(_req.load_id)
+                  << ", stats: " << stat;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 43f8a88..01e8d66 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -94,7 +94,8 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBatchRequest& request,
     Status st;
     if (request.has_eos() && request.eos()) {
         bool finished = false;
-        RETURN_IF_ERROR(channel->close(request.sender_id(), &finished, 
request.partition_ids(),
+        RETURN_IF_ERROR(channel->close(request.sender_id(), 
request.backend_id(), 
+                                       &finished, request.partition_ids(),
                                        tablet_vec));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 531a3bd..db5e23b 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -126,7 +126,7 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBatchRequest& params) {
     return Status::OK();
 }
 
-Status TabletsChannel::close(int sender_id, bool* finished,
+Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
                              const google::protobuf::RepeatedField<int64_t>& 
partition_ids,
                              google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec) {
     std::lock_guard<std::mutex> l(_lock);
@@ -138,7 +138,8 @@ Status TabletsChannel::close(int sender_id, bool* finished,
         *finished = (_num_remaining_senders == 0);
         return _close_status;
     }
-    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id;
+    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << 
sender_id
+              << ", backend id: " << backend_id;
     for (auto pid : partition_ids) {
         _partition_ids.emplace(pid);
     }
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 0f01512..b0402bd 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -67,7 +67,7 @@ public:
     // If all senders are closed, close this channel, set '*finished' to true, 
update 'tablet_vec'
     // to include all tablets written in this channel.
     // no-op when this channel has been closed or cancelled
-    Status close(int sender_id, bool* finished,
+    Status close(int sender_id, int64_t backend_id, bool* finished,
                  const google::protobuf::RepeatedField<int64_t>& partition_ids,
                  google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 35e94a9..93dc6b1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -123,7 +123,8 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << 
st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << 
request->index_id()
-                             << ", sender_id=" << request->sender_id();
+                             << ", sender_id=" << request->sender_id()
+                             << ", backend id=" << request->backend_id();
             }
             st.to_protobuf(response->mutable_status());
         }
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 364f580..160b5f4 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -30,6 +30,7 @@ const static std::string S3_AK = "AWS_ACCESS_KEY";
 const static std::string S3_SK = "AWS_SECRET_KEY";
 const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
 const static std::string S3_REGION = "AWS_REGION";
+const static std::string USE_PATH_STYLE = "use_path_style";
 
 ClientFactory::ClientFactory() {
     _aws_options = Aws::SDKOptions{};
@@ -67,7 +68,15 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
     Aws::Client::ClientConfiguration aws_config;
     aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
     aws_config.region = properties.find(S3_REGION)->second;
-    return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), 
std::move(aws_config));
+
+    // See 
https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
+    bool use_virtual_addressing = true;
+    if (properties.find(USE_PATH_STYLE) != properties.end()) {
+        use_virtual_addressing = properties.find(USE_PATH_STYLE)->second == 
"true" ? false : true;
+    }
+    return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), 
std::move(aws_config),
+            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+            use_virtual_addressing);
 }
 
 } // end namespace doris
diff --git a/be/test/util/s3_storage_backend_test.cpp 
b/be/test/util/s3_storage_backend_test.cpp
index a8ea878..3e8cc87 100644
--- a/be/test/util/s3_storage_backend_test.cpp
+++ b/be/test/util/s3_storage_backend_test.cpp
@@ -33,18 +33,20 @@
 #include "util/storage_backend.h"
 
 namespace doris {
-static const std::string AK = "AK";
-static const std::string SK = "SK";
+static const std::string AK = "";
+static const std::string SK = "";
 static const std::string ENDPOINT = "http://s3.bj.bcebos.com";;
+static const std::string USE_PATH_STYLE = "false";
 static const std::string REGION = "bj";
-static const std::string BUCKET = "s3://yang-repo/";
+static const std::string BUCKET = "s3://cmy-repo/";
 class S3StorageBackendTest : public testing::Test {
 public:
     S3StorageBackendTest()
             : _aws_properties({{"AWS_ACCESS_KEY", AK},
                                {"AWS_SECRET_KEY", SK},
                                {"AWS_ENDPOINT", ENDPOINT},
-                               {"AWS_REGION", "bj"}}) {
+                               {"USE_PATH_STYLE", USE_PATH_STYLE},
+                               {"AWS_REGION", REGION}}) {
         _s3.reset(new S3StorageBackend(_aws_properties));
         _s3_base_path = BUCKET + "s3/" + gen_uuid();
     }
@@ -189,10 +191,7 @@ TEST_F(S3StorageBackendTest, s3_mkdir) {
 int main(int argc, char** argv) {
     ::testing::InitGoogleTest(&argc, argv);
     int ret = 0;
-    Aws::SDKOptions options;
-    Aws::InitAPI(options);
-    // ak sk is secret
+    // set ak sk before running it.
     // ret = RUN_ALL_TESTS();
-    Aws::ShutdownAPI(options);
     return ret;
-}
\ No newline at end of file
+}
diff --git a/docs/en/administrator-guide/load-data/s3-load-manual.md 
b/docs/en/administrator-guide/load-data/s3-load-manual.md
index 7e3aa28..b9c2b2a 100644
--- a/docs/en/administrator-guide/load-data/s3-load-manual.md
+++ b/docs/en/administrator-guide/load-data/s3-load-manual.md
@@ -48,12 +48,12 @@ Other cloud storage systems can find relevant information 
compatible with S3 in
 Like Broker Load just replace `WITH BROKER broker_name ()` with
 ```
     WITH S3
- (
+    (
         "AWS_ENDPOINT" = "AWS_ENDPOINT",
         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
         "AWS_REGION" = "AWS_REGION"
-  )
+    )
 ```
 
 example:
@@ -75,4 +75,19 @@ example:
     (
         "timeout" = "3600"
     );
-```
\ No newline at end of file
+```
+
+## FAQ
+
+S3 SDK uses virtual-hosted style by default. However, some object storage 
systems may not be enabled or support virtual-hosted style access. At this 
time, we can add the `use_path_style` parameter to force the use of path style:
+
+```
+   WITH S3
+   (
+         "AWS_ENDPOINT" = "AWS_ENDPOINT",
+         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
+         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
+         "AWS_REGION" = "AWS_REGION",
+         "use_path_style" = "true"
+   )
+```
diff --git a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md 
b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
index 4bf52ce..3c9b6c5 100644
--- a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md
@@ -49,12 +49,12 @@ under the License.
 导入方式和Broker Load 基本相同,只需要将 `WITH BROKER broker_name ()` 语句替换成如下部分
 ```
     WITH S3
- (
+    (
         "AWS_ENDPOINT" = "AWS_ENDPOINT",
         "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
         "AWS_SECRET_KEY"="AWS_SECRET_KEY",
         "AWS_REGION" = "AWS_REGION"
-  )
+    )
 ```
 
 完整示例如下
@@ -76,4 +76,19 @@ under the License.
     (
         "timeout" = "3600"
     );
-```
\ No newline at end of file
+```
+
+## 常见问题
+
+S3 SDK 默认使用 virtual-hosted style 方式。但某些对象存储系统可能没开启或没支持 virtual-hosted style 
方式的访问,此时我们可以添加 `use_path_style` 参数来强制使用 path style 方式:
+
+```
+  WITH S3
+  (
+        "AWS_ENDPOINT" = "AWS_ENDPOINT",
+        "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
+        "AWS_SECRET_KEY"="AWS_SECRET_KEY",
+        "AWS_REGION" = "AWS_REGION",
+        "use_path_style" = "true"
+  )
+```
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 1ebbba5..e40f29a 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -514,7 +514,7 @@ under the License.
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
-            <version>2.7.3</version>
+            <version>2.8.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
index 10fb1d4..d5b98fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
@@ -134,5 +134,4 @@ public abstract class BlobStorage implements Writable {
             Text.writeString(out, entry.getValue());
         }
     }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 3a2623d..df8230c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -29,16 +29,16 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.system.Backend;
 
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.json.JSONObject;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.json.JSONObject;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
@@ -315,7 +315,9 @@ public class Repository implements Writable {
     // Check if this repo is available.
     // If failed to connect this repo, set errMsg and return false.
     public boolean ping() {
-        String path = location + "/" + joinPrefix(PREFIX_REPO, name);
+        // for s3 sdk, the headObject() method does not support list "dir",
+        // so we check FILE_REPO_INFO instead.
+        String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + 
FILE_REPO_INFO;
         try {
             URI checkUri = new URI(path);
             Status st = 
storage.checkPathExist(checkUri.normalize().toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
index 2032229..9a2f29b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
@@ -71,9 +71,19 @@ public class S3Storage extends BlobStorage {
     public static final String S3_SK = "AWS_SECRET_KEY";
     public static final String S3_ENDPOINT = "AWS_ENDPOINT";
     public static final String S3_REGION = "AWS_REGION";
+    public static final String USE_PATH_STYLE = "use_path_style";
+
     private static final Logger LOG = LogManager.getLogger(S3Storage.class);
     private final CaseInsensitiveMap caseInsensitiveProperties;
     private S3Client client;
+    // false: the s3 client will automatically convert endpoint to 
virtual-hosted style, eg:
+    //          endpoint:           http://s3.us-east-2.amazonaws.com
+    //          bucket/path:        my_bucket/file.txt
+    //          auto convert:       
http://my_bucket.s3.us-east-2.amazonaws.com/file.txt
+    // true: the s3 client will NOT automatically convert endpoint to 
virtual-hosted style, we need to do some tricks:
+    //          endpoint:           http://cos.ap-beijing.myqcloud.com
+    //          bucket/path:        my_bucket/file.txt
+    //          convert manually:   See S3URI()
     private boolean forceHostedStyle = false;
 
     public S3Storage(Map<String, String> properties) {
@@ -89,21 +99,31 @@ public class S3Storage extends BlobStorage {
         super.setProperties(properties);
         caseInsensitiveProperties.putAll(properties);
         // Virtual hosted-sytle is recommended in the s3 protocol.
-        // The path-style has been abandoned, but for some unexplainable 
reasons.
-        // The s3 client will determine whether the endpiont starts with `s3`
+        // The path-style has been abandoned, but for some unexplainable 
reasons,
+        // the s3 client will determine whether the endpiont starts with `s3`
         // when generating a virtual hosted-sytle request.
         // If not, it will not be converted ( 
https://github.com/aws/aws-sdk-java-v2/pull/763),
         // but the endpoints of many cloud service providers for object 
storage do not start with s3,
         // so they cannot be converted to virtual hosted-sytle.
-        // Some of them, such as aliyun's oss, only support virtual 
hosted-sytle,
-        // so we need to do some additional conversion.
-
+        // Some of them, such as aliyun's oss, only support virtual 
hosted-sytle, and some of them(ceph) may only support
+        // path-style, so we need to do some additional conversion.
+        //
+        //          use_path_style          |     !use_path_style
+        //   S3     forceHostedStyle=false  |     forceHostedStyle=false
+        //  !S3     forceHostedStyle=false  |     forceHostedStyle=true
+        //
+        // That is, for S3 endpoint, ignore the `use_path_style` property, and 
the s3 client will automatically use
+        // virtual hosted-sytle.
+        // And for other endpoint, if `use_path_style` is true, use path 
style. Otherwise, use virtual hosted-sytle.
         if 
(!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3"))
 {
-            forceHostedStyle = true;
+            if (caseInsensitiveProperties.getOrDefault(USE_PATH_STYLE, 
"false").toString().equalsIgnoreCase("true")) {
+                forceHostedStyle = false;
+            } else {
+                forceHostedStyle = true;
+            }
         } else {
             forceHostedStyle = false;
         }
-
     }
 
     public static void checkS3(CaseInsensitiveMap caseInsensitiveProperties) 
throws UserException {
@@ -168,7 +188,6 @@ public class S3Storage extends BlobStorage {
     @Override
     public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
         long start = System.currentTimeMillis();
-        S3URI uri = new S3URI(remoteFilePath, forceHostedStyle);
         // Write the data to a local file
         File localFile = new File(localFilePath);
         if (localFile.exists()) {
@@ -183,6 +202,7 @@ public class S3Storage extends BlobStorage {
             }
         }
         try {
+            S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle);
             GetObjectResponse response = 
getClient(uri.getVirtualBucket()).getObject(GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
 localFile.toPath());
             if (localFile.length() == fileSize) {
                 LOG.info(
@@ -200,7 +220,7 @@ public class S3Storage extends BlobStorage {
                     Status.ErrCode.COMMON_ERROR,
                     "get file from s3 error: " + 
s3Exception.awsErrorDetails().errorMessage());
         } catch (UserException ue) {
-            LOG.error("connect to s3 failed: ", ue);
+            LOG.warn("connect to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 
failed: " + ue.getMessage());
         } catch (Exception e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.toString());
@@ -209,8 +229,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status directUpload(String content, String remoteFile) {
-        S3URI uri = new S3URI(remoteFile, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remoteFile, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -228,9 +248,9 @@ public class S3Storage extends BlobStorage {
     }
 
     public Status copy(String origFilePath, String destFilePath) {
-        S3URI origUri = new S3URI(origFilePath);
-        S3URI descUri = new S3URI(destFilePath, forceHostedStyle);
         try {
+            S3URI origUri = S3URI.create(origFilePath);
+            S3URI descUri = S3URI.create(destFilePath, forceHostedStyle);
             getClient(descUri.getVirtualBucket())
                     .copyObject(
                             CopyObjectRequest.builder()
@@ -250,8 +270,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status upload(String localPath, String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -280,8 +300,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status delete(String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             DeleteObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .deleteObject(
@@ -319,6 +339,9 @@ public class S3Storage extends BlobStorage {
             conf.set("fs.s3a.endpoint", s3Endpoint);
             conf.set("fs.s3a.impl.disable.cache", "true");
             conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+            // introducing in hadoop aws 2.8.0
+            conf.set("fs.s3a.path.style.access", forceHostedStyle ? "false" : 
"true");
+            conf.set("fs.s3a.attempts.maximum", "2");
             FileSystem s3AFileSystem = FileSystem.get(new URI(remotePath), 
conf);
             org.apache.hadoop.fs.Path pathPattern = new 
org.apache.hadoop.fs.Path(remotePath);
             FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
@@ -344,8 +367,8 @@ public class S3Storage extends BlobStorage {
         if (!remotePath.endsWith("/")) {
             remotePath += "/";
         }
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             PutObjectResponse response =
                     getClient(uri.getVirtualBucket())
                             .putObject(
@@ -364,8 +387,8 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status checkPathExist(String remotePath) {
-        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
+            S3URI uri = S3URI.create(remotePath, forceHostedStyle);
             getClient(uri.getVirtualBucket())
                     
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
             return Status.OK;
@@ -373,11 +396,11 @@ public class S3Storage extends BlobStorage {
             if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
                 return new Status(Status.ErrCode.NOT_FOUND, "remote path does 
not exist: " + remotePath);
             } else {
-                LOG.error("headObject failed:", e);
+                LOG.warn("headObject failed:", e);
                 return new Status(Status.ErrCode.COMMON_ERROR, "headObject 
failed: " + e.getMessage());
             }
         } catch (UserException ue) {
-            LOG.error("connect to s3 failed: ", ue);
+            LOG.warn("connect to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 
failed: " + ue.getMessage());
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 5295f96..d823a21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -147,7 +147,7 @@ public class BrokerUtil {
                     fileStatuses.add(tBrokerFileStatus);
                 }
             } catch (TException e) {
-                LOG.warn("Broker list path exception, path={}, address={}, 
exception={}", path, address, e);
+                LOG.warn("Broker list path exception, path={}, address={}", 
path, address, e);
                 throw new UserException("Broker list path exception. path=" + 
path + ", broker=" + address);
             } finally {
                 returnClient(client, address, failed);
@@ -155,10 +155,15 @@ public class BrokerUtil {
         } else if (brokerDesc.getStorageType() == 
StorageBackend.StorageType.S3) {
             S3Storage s3 = new S3Storage(brokerDesc.getProperties());
             List<RemoteFile> rfiles = new ArrayList<>();
-            Status st = s3.list(path, rfiles, false);
-            if (!st.ok()) {
-                throw new UserException("S3 list path failed. path=" + path
-                    + ",msg=" + st.getErrMsg());
+            try {
+                Status st = s3.list(path, rfiles, false);
+                if (!st.ok()) {
+                    throw new UserException("S3 list path failed. path=" + path
+                            + ",msg=" + st.getErrMsg());
+                }
+            } catch (Exception e) {
+                LOG.warn("s3 list path exception, path={}", path, e);
+                throw new UserException("s3 list path exception. path=" + path 
+ ", err: " + e.getMessage());
             }
             for (RemoteFile r : rfiles) {
                 if (r.isFile()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index 0aadfe4..1cf1b80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -17,11 +17,15 @@
 
 package org.apache.doris.common.util;
 
-import com.google.common.base.Preconditions;
+import org.apache.doris.common.UserException;
+
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.parquet.Strings;
 import org.apache.parquet.glob.GlobExpander;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Set;
 
@@ -44,7 +48,7 @@ public class S3URI {
     private final String virtualBucket;
     private final String bucket;
     private final String key;
-    private boolean forceHosted;
+    private boolean forceVirtualHosted;
 
     /**
      * Creates a new S3URI based on the bucket and key parsed from the 
location as defined in:
@@ -55,27 +59,60 @@ public class S3URI {
      *
      * @param location fully qualified URI
      */
-    public S3URI(String location) {
-        this(location, false);
+
+    public static S3URI create(String location) throws UserException {
+        return create(location, false);
+    }
+
+    public static S3URI create(String location, boolean forceVirtualHosted) 
throws UserException {
+        S3URI s3URI = new S3URI(location, forceVirtualHosted);
+        return s3URI;
     }
 
-    public S3URI(String location, boolean forceHosted) {
-        Preconditions.checkNotNull(location, "Location cannot be null.");
-        this.location = location;
-        this.forceHosted = forceHosted;
-        String[] schemeSplit = location.split(SCHEME_DELIM);
-        Preconditions.checkState(schemeSplit.length == 2, "Invalid S3 URI: 
%s", location);
+    private S3URI(String location, boolean forceVirtualHosted) throws 
UserException {
+        if (Strings.isNullOrEmpty(location)) {
+            throw new UserException("s3 location can not be null");
+        }
+
+        try {
+            // the location need to be normalized to eliminate double "/", or 
the hadoop aws api
+            // won't handle it correctly.
+            this.location = new URI(location).normalize().toString();
+        } catch (URISyntaxException e) {
+            throw new UserException("Invalid s3 uri: " + e.getMessage());
+        }
+
+        this.forceVirtualHosted = forceVirtualHosted;
+        String[] schemeSplit = this.location.split(SCHEME_DELIM);
+        if (schemeSplit.length != 2) {
+            throw new UserException("Invalid s3 uri: " + this.location);
+        }
 
         this.scheme = schemeSplit[0];
-        Preconditions.checkState(VALID_SCHEMES.contains(scheme.toLowerCase()), 
"Invalid scheme: %s", scheme);
+        if (!VALID_SCHEMES.contains(scheme.toLowerCase())) {
+            throw new UserException("Invalid scheme: " + this.location);
+        }
+
         String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
-        Preconditions.checkState(authoritySplit.length == 2, "Invalid S3 URI: 
%s", location);
-        Preconditions.checkState(!authoritySplit[1].trim().isEmpty(), "Invalid 
S3 key: %s", location);
+        if (authoritySplit.length != 2) {
+            throw new UserException("Invalid s3 uri: " + this.location);
+        }
+        if (authoritySplit[1].trim().isEmpty()) {
+            throw new UserException("Invalid s3 key: " + this.location);
+        }
+
         // Strip query and fragment if they exist
         String path = authoritySplit[1];
         path = path.split(QUERY_DELIM)[0];
         path = path.split(FRAGMENT_DELIM)[0];
-        if (forceHosted) {
+        if (this.forceVirtualHosted) {
+            // If forceVirtualHosted is true, the s3 client will NOT 
automatically convert to virtual-hosted style.
+            // So we do some convert manually. Eg:
+            //          endpoint:           http://cos.ap-beijing.myqcloud.com
+            //          bucket/path:        my_bucket/file.txt
+            // `virtualBucket` will be "my_bucket"
+            // `bucket` will be `file.txt`
+            // So that when assembling the real endpoint will be: 
http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
             this.virtualBucket = authoritySplit[0];
             String[] paths = path.split("/", 2);
             this.bucket = paths[0];
@@ -85,6 +122,9 @@ public class S3URI {
                 key = "";
             }
         } else {
+            // If forceVirtualHosted is false, let the s3 client to determine 
how to covert endpoint, eg:
+            // For s3 endpoint(start with "s3."), it will convert to 
virtual-hosted style.
+            // For others, keep as it is (maybe path-style, maybe 
virtual-hosted style.)
             this.virtualBucket = "";
             this.bucket = authoritySplit[0];
             key = path;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index fa05289..bd6ec87 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.httpv2.rest.manager;
 
-import lombok.Getter;
-import lombok.Setter;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -48,7 +46,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
-import org.apache.commons.httpclient.HttpException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -70,6 +67,9 @@ import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /*
  * Used to return all node information, configuration information and modify 
node config.
  */
@@ -540,10 +540,10 @@ public class NodeAction extends RestBaseController {
     }
 
     private void parseFeSetConfigResponse(String response, Pair<String, 
Integer> hostPort,
-                                          List<Map<String, String>> 
failedTotal) throws HttpException {
+                                          List<Map<String, String>> 
failedTotal) throws Exception {
         JsonObject jsonObject = 
JsonParser.parseString(response).getAsJsonObject();
         if (jsonObject.get("code").getAsInt() != 
HttpUtils.REQUEST_SUCCESS_CODE) {
-            throw new HttpException(jsonObject.get("msg").getAsString());
+            throw new Exception(jsonObject.get("msg").getAsString());
         }
         SetConfigAction.SetConfigEntity setConfigEntity = 
GsonUtils.GSON.fromJson(jsonObject.get("data").getAsJsonObject(),
                 SetConfigAction.SetConfigEntity.class);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
index f52d3ef..be74035 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java
@@ -39,7 +39,7 @@ import java.util.UUID;
 @Ignore
 public class S3StorageTest {
     private static String basePath;
-    private final String bucket = "s3://yang-repo/";
+    private final String bucket = "s3://doris-test/";
     private Map<String, String> properties;
     private S3Storage storage;
     private String testFile;
@@ -56,6 +56,8 @@ public class S3StorageTest {
         properties.put("AWS_ACCESS_KEY", 
System.getenv().getOrDefault("AWS_AK", ""));
         properties.put("AWS_SECRET_KEY", 
System.getenv().getOrDefault("AWS_SK", ""));
         properties.put("AWS_ENDPOINT", "http://s3.bj.bcebos.com";);
+        properties.put(S3Storage.USE_PATH_STYLE, "false");
+
         properties.put("AWS_REGION", "bj");
         storage = new S3Storage(properties);
         testFile = bucket + basePath + "/Ode_to_the_West_Wind";
@@ -123,7 +125,6 @@ public class S3StorageTest {
         storage.rename(testFile + ".bak", testFile + ".bak1");
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, 
storage.checkPathExist(testFile + ".bak").getErrCode());
         Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + 
".bak1"));
-
     }
 
     @Test
@@ -133,17 +134,16 @@ public class S3StorageTest {
         Assert.assertEquals(Status.OK, storage.delete(deleteFile));
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, 
storage.checkPathExist(deleteFile).getErrCode());
         Assert.assertEquals(Status.OK, storage.delete(deleteFile + "xxxx"));
-
     }
 
     @Test
     public void list() {
         List<RemoteFile> result = new ArrayList<>();
-        String listPath =  bucket + basePath + "_list" + 
"/Ode_to_the_West_Wind";
+        String listPath = bucket + basePath + "_list" + 
"/Ode_to_the_West_Wind";
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath 
+ ".1"));
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath 
+ ".2"));
         Assert.assertEquals(Status.OK, storage.directUpload(content, listPath 
+ ".3"));
-        Assert.assertEquals(Status.OK, storage.list(bucket + basePath  + 
"_list/*", result));
+        Assert.assertEquals(Status.OK, storage.list(bucket + basePath + 
"_list/*", result));
         Assert.assertEquals(3, result.size());
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
index f310c5b..52ab836 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java
@@ -17,24 +17,26 @@
 
 package org.apache.doris.common.util;
 
-import org.junit.Test;
+import org.apache.doris.common.UserException;
 
 import org.junit.Assert;
+import org.junit.Test;
 
 public class S3URITest {
     @Test
-    public void testLocationParsing() {
+    public void testLocationParsing() throws UserException {
         String p1 = "s3://bucket/path/to/file";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/to/file", uri1.getKey());
         Assert.assertEquals(p1, uri1.toString());
     }
+
     @Test
-    public void testPathLocationParsing() {
+    public void testPathLocationParsing() throws UserException {
         String p1 = "s3://bucket/path/";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/", uri1.getKey());
@@ -42,34 +44,34 @@ public class S3URITest {
     }
 
     @Test
-    public void testEncodedString() {
+    public void testEncodedString() throws UserException {
         String p1 = "s3://bucket/path%20to%20file";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path%20to%20file", uri1.getKey());
         Assert.assertEquals(p1, uri1.toString());
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void missingKey() {
-        new S3URI("https://bucket/";);
+    @Test(expected = UserException.class)
+    public void missingKey() throws UserException {
+        S3URI.create("https://bucket/";);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void relativePathing() {
-        new S3URI("/path/to/file");
+    @Test(expected = UserException.class)
+    public void relativePathing() throws UserException {
+        S3URI.create("/path/to/file");
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void invalidScheme() {
-        new S3URI("ftp://bucket/";);
+    @Test(expected = UserException.class)
+    public void invalidScheme() throws UserException {
+        S3URI.create("ftp://bucket/";);
     }
 
     @Test
-    public void testQueryAndFragment() {
+    public void testQueryAndFragment() throws UserException {
         String p1 = "s3://bucket/path/to/file?query=foo#bar";
-        S3URI uri1 = new S3URI(p1);
+        S3URI uri1 = S3URI.create(p1);
 
         Assert.assertEquals("bucket", uri1.getBucket());
         Assert.assertEquals("path/to/file", uri1.getKey());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index 3a21275..e5f2fc5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -20,9 +20,7 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.FunctionName;
-import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.AggregateType;
@@ -35,13 +33,9 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarFunction;
 import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.load.Load;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TExplainLevel;
@@ -502,6 +496,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k3");
         StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request);
@@ -629,6 +647,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k5 = 1");
@@ -659,6 +701,30 @@ public class StreamLoadScanNodeTest {
             }
         }
 
+        new Expectations() {{
+            dstTable.getBaseSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getBaseSchema(anyBoolean);
+            minTimes = 0;
+            result = columns;
+            dstTable.getFullSchema();
+            minTimes = 0;
+            result = columns;
+            dstTable.getColumn("k1");
+            minTimes = 0;
+            result = columns.get(0);
+            dstTable.getColumn("k2");
+            minTimes = 0;
+            result = columns.get(1);
+            dstTable.getColumn("v1");
+            minTimes = 0;
+            result = columns.get(2);
+            dstTable.getColumn("v2");
+            minTimes = 0;
+            result = columns.get(3);
+        }};
+
         TStreamLoadPutRequest request = getBaseRequest();
         request.setColumns("k1,k2,v1, v2=k1");
         request.setWhere("k1 + v2");
diff --git a/fe/pom.xml b/fe/pom.xml
index bf8b049..453a444 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -590,7 +590,7 @@ under the License.
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-common</artifactId>
-                <version>2.7.3</version>
+                <version>2.8.0</version>
                 <scope>provided</scope>
                 <exclusions>
                     <exclusion>
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 5ecbb69..2e07c46 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -94,6 +94,8 @@ message PTabletWriterAddBatchRequest {
     // only valid when eos is true
     // valid partition ids that would write in this writer
     repeated int64 partition_ids = 8;
+    // the backend which send this request
+    optional int64 backend_id = 9 [default = -1];
 };
 
 message PTabletWriterAddBatchResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to