This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 96e1280f4b8 [branch-2.0](resource)fix check available fail when s3 
aws_token is set and reset as, sk faild on be. #34655
96e1280f4b8 is described below

commit 96e1280f4b8223a425bb5fd412b38b2f5feaa885
Author: huanghg1994 <519500...@qq.com>
AuthorDate: Mon May 13 20:56:20 2024 +0800

    [branch-2.0](resource)fix check available fail when s3 aws_token is set and 
reset as, sk faild on be. #34655
---
 be/src/agent/task_worker_pool.cpp                  |  3 ++-
 be/src/io/fs/s3_file_system.cpp                    | 23 ++++++++++++++++++++++
 be/src/io/fs/s3_file_system.h                      |  2 +-
 .../java/org/apache/doris/catalog/S3Resource.java  | 16 ++++++++++++---
 .../property/constants/S3Properties.java           |  1 +
 gensrc/thrift/AgentService.thrift                  |  1 +
 .../cold_heat_separation/policy/alter.groovy       | 11 +++++++++++
 7 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index bcf8aa93210..5d67a5bac08 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1196,6 +1196,7 @@ void 
TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
                 S3Conf s3_conf;
                 s3_conf.ak = std::move(resource.s3_storage_param.ak);
                 s3_conf.sk = std::move(resource.s3_storage_param.sk);
+                s3_conf.token = std::move(resource.s3_storage_param.token);
                 s3_conf.endpoint = 
std::move(resource.s3_storage_param.endpoint);
                 s3_conf.region = std::move(resource.s3_storage_param.region);
                 s3_conf.prefix = 
std::move(resource.s3_storage_param.root_path);
@@ -1211,7 +1212,7 @@ void 
TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
                     st = io::S3FileSystem::create(s3_conf, 
std::to_string(resource.id), &fs);
                 } else {
                     fs = 
std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
-                    fs->set_conf(s3_conf);
+                    st = fs->set_conf(s3_conf);
                 }
                 if (!st.ok()) {
                     LOG(WARNING) << "update s3 resource failed: " << st;
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 6423de4fa15..fd21c3f2395 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -96,6 +96,29 @@ namespace io {
     RETURN_IF_ERROR(get_key(path, &key));
 #endif
 
+// Guarded by external lock.
+Status S3FileSystem::set_conf(S3Conf s3_conf) {
+    if (s3_conf.ak == _s3_conf.ak && s3_conf.sk == _s3_conf.sk && 
s3_conf.token == _s3_conf.token) {
+        return Status::OK(); // Same conf
+    }
+
+    auto reset_conf = _s3_conf;
+    reset_conf.ak = s3_conf.ak;
+    reset_conf.sk = s3_conf.sk;
+    reset_conf.token = s3_conf.token;
+    auto client = S3ClientFactory::instance().create(s3_conf);
+    if (!client) {
+        return Status::InternalError("failed to init s3 client with {}", 
_s3_conf.to_string());
+    }
+
+    {
+        std::lock_guard lock(_client_mu);
+        _client = std::move(client);
+    }
+    _s3_conf = std::move(reset_conf);
+    return Status::OK();
+}
+
 Status S3FileSystem::create(S3Conf s3_conf, std::string id, 
std::shared_ptr<S3FileSystem>* fs) {
     (*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id)));
     return (*fs)->connect();
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 301d97c73cd..0044288b3a5 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -59,7 +59,7 @@ public:
     static Status create(S3Conf s3_conf, std::string id, 
std::shared_ptr<S3FileSystem>* fs);
     ~S3FileSystem() override;
     // Guarded by external lock.
-    void set_conf(S3Conf s3_conf) { _s3_conf = std::move(s3_conf); }
+    Status set_conf(S3Conf s3_conf);
 
     std::shared_ptr<Aws::S3::S3Client> get_client() const {
         std::lock_guard lock(_client_mu);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index b8ef318077f..1dd09cbf981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -104,7 +104,8 @@ public class S3Resource extends Resource {
         properties.putIfAbsent(S3Properties.REGION, region);
         String ak = properties.get(S3Properties.ACCESS_KEY);
         String sk = properties.get(S3Properties.SECRET_KEY);
-        CloudCredentialWithEndpoint credential = new 
CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
+        String token = properties.get(S3Properties.SESSION_TOKEN);
+        CloudCredentialWithEndpoint credential = new 
CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);
 
         if (needCheck) {
             String bucketName = properties.get(S3Properties.BUCKET);
@@ -125,6 +126,7 @@ public class S3Resource extends Resource {
         Map<String, String> propertiesPing = new HashMap<>();
         propertiesPing.put(S3Properties.Env.ACCESS_KEY, 
credential.getAccessKey());
         propertiesPing.put(S3Properties.Env.SECRET_KEY, 
credential.getSecretKey());
+        propertiesPing.put(S3Properties.Env.TOKEN, 
credential.getSessionToken());
         propertiesPing.put(S3Properties.Env.ENDPOINT, 
credential.getEndpoint());
         propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
         propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
@@ -189,6 +191,10 @@ public class S3Resource extends Resource {
         writeLock();
         for (Map.Entry<String, String> kv : properties.entrySet()) {
             replaceIfEffectiveValue(this.properties, kv.getKey(), 
kv.getValue());
+            if (kv.getKey().equals(S3Properties.Env.TOKEN)
+                    || kv.getKey().equals(S3Properties.SESSION_TOKEN)) {
+                this.properties.put(kv.getKey(), kv.getValue());
+            }
         }
         ++version;
         writeUnlock();
@@ -198,11 +204,13 @@ public class S3Resource extends Resource {
     private CloudCredentialWithEndpoint getS3PingCredentials(Map<String, 
String> properties) {
         String ak = properties.getOrDefault(S3Properties.ACCESS_KEY, 
this.properties.get(S3Properties.ACCESS_KEY));
         String sk = properties.getOrDefault(S3Properties.SECRET_KEY, 
this.properties.get(S3Properties.SECRET_KEY));
+        String token = properties.getOrDefault(S3Properties.SESSION_TOKEN,
+                this.properties.get(S3Properties.SESSION_TOKEN));
         String endpoint = properties.getOrDefault(S3Properties.ENDPOINT, 
this.properties.get(S3Properties.ENDPOINT));
         String pingEndpoint = "http://"; + endpoint;
         String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
         properties.putIfAbsent(S3Properties.REGION, region);
-        return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
+        return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, 
token);
     }
 
     private boolean isNeedCheck(Map<String, String> newProperties) {
@@ -232,7 +240,9 @@ public class S3Resource extends Resource {
             // it's dangerous to show password in show odbc resource,
             // so we use empty string to replace the real password
             if (entry.getKey().equals(S3Properties.Env.SECRET_KEY)
-                    || entry.getKey().equals(S3Properties.SECRET_KEY)) {
+                    || entry.getKey().equals(S3Properties.SECRET_KEY)
+                    || entry.getKey().equals(S3Properties.Env.TOKEN)
+                    || entry.getKey().equals(S3Properties.SESSION_TOKEN)) {
                 result.addRow(Lists.newArrayList(name, lowerCaseType, 
entry.getKey(), "******"));
             } else {
                 result.addRow(Lists.newArrayList(name, lowerCaseType, 
entry.getKey(), entry.getValue()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index c4d3cce9c2c..51bca2c66b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -253,6 +253,7 @@ public class S3Properties extends BaseProperties {
         s3Info.setRegion(properties.get(S3Properties.REGION));
         s3Info.setAk(properties.get(S3Properties.ACCESS_KEY));
         s3Info.setSk(properties.get(S3Properties.SECRET_KEY));
+        s3Info.setToken(properties.get(S3Properties.SESSION_TOKEN));
 
         s3Info.setRootPath(properties.get(S3Properties.ROOT_PATH));
         s3Info.setBucket(properties.get(S3Properties.BUCKET));
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 589c9b97b6b..83939252b09 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -71,6 +71,7 @@ struct TS3StorageParam {
     8: optional string root_path
     9: optional string bucket
     10: optional bool use_path_style = false
+    11: optional string token
 }
 
 struct TStoragePolicy {
diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy 
b/regression-test/suites/cold_heat_separation/policy/alter.groovy
index 3cb46cde103..83ead8e0176 100644
--- a/regression-test/suites/cold_heat_separation/policy/alter.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy
@@ -39,6 +39,7 @@ suite("alter_policy") {
             "AWS_ROOT_PATH" = "path/to/rootaaaa",
             "AWS_ACCESS_KEY" = "bbba",
             "AWS_SECRET_KEY" = "aaaa",
+            "AWS_TOKEN" = "session_token",
             "AWS_MAX_CONNECTIONS" = "50",
             "AWS_REQUEST_TIMEOUT_MS" = "3000",
             "AWS_CONNECTION_TIMEOUT_MS" = "1000",
@@ -70,6 +71,10 @@ suite("alter_policy") {
             ALTER RESOURCE "${resource_name}" 
PROPERTIES("AWS_REQUEST_TIMEOUT_MS" = "7777");
         """
 
+        def alter_result_succ_8 = try_sql """
+            ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_TOKEN" = 
"new_session_token");
+        """
+
         // errCode = 2, detailMessage = current not support modify property : 
AWS_REGION
         def alter_result_fail_1 = try_sql """
             ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_REGION" = 
"8888");
@@ -112,6 +117,7 @@ suite("alter_policy") {
         // [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777],
         // [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa],
         // [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******],
+        // [has_resouce_policy_alter, s3, AWS_TOKEN, ******],
         // [has_resouce_policy_alter, s3, id, {id}],
         // [has_resouce_policy_alter, s3, type, s3]
         // [has_resouce_policy_alter, s3, version, {version}]]
@@ -133,6 +139,8 @@ suite("alter_policy") {
         assertEquals(show_alter_result[8][3], "10101010")
         // AWS_SECRET_KEY
         assertEquals(show_alter_result[9][3], "******")
+        // AWS_TOKEN
+        assertEquals(show_alter_result[10][3], "******")
     }
 
     def check_alter_resource_result_with_policy = { resource_name ->
@@ -151,6 +159,7 @@ suite("alter_policy") {
         // [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777],
         // [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa],
         // [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******],
+        // [has_resouce_policy_alter, s3, AWS_TOKEN, ******],
         // [has_resouce_policy_alter, s3, id, {id}],
         // [has_resouce_policy_alter, s3, type, s3]
         // [has_resouce_policy_alter, s3, version, {version}]]
@@ -172,6 +181,8 @@ suite("alter_policy") {
         assertEquals(show_alter_result[8][3], "path/to/rootaaaa")
         // AWS_SECRET_KEY
         assertEquals(show_alter_result[9][3], "******")
+        // AWS_TOKEN
+        assertEquals(show_alter_result[10][3], "******")
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to