This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 96e1280f4b8 [branch-2.0](resource)fix check available fail when s3 aws_token is set and reset as, sk faild on be. #34655 96e1280f4b8 is described below commit 96e1280f4b8223a425bb5fd412b38b2f5feaa885 Author: huanghg1994 <519500...@qq.com> AuthorDate: Mon May 13 20:56:20 2024 +0800 [branch-2.0](resource)fix check available fail when s3 aws_token is set and reset as, sk faild on be. #34655 --- be/src/agent/task_worker_pool.cpp | 3 ++- be/src/io/fs/s3_file_system.cpp | 23 ++++++++++++++++++++++ be/src/io/fs/s3_file_system.h | 2 +- .../java/org/apache/doris/catalog/S3Resource.java | 16 ++++++++++++--- .../property/constants/S3Properties.java | 1 + gensrc/thrift/AgentService.thrift | 1 + .../cold_heat_separation/policy/alter.groovy | 11 +++++++++++ 7 files changed, 52 insertions(+), 5 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index bcf8aa93210..5d67a5bac08 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1196,6 +1196,7 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() { S3Conf s3_conf; s3_conf.ak = std::move(resource.s3_storage_param.ak); s3_conf.sk = std::move(resource.s3_storage_param.sk); + s3_conf.token = std::move(resource.s3_storage_param.token); s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint); s3_conf.region = std::move(resource.s3_storage_param.region); s3_conf.prefix = std::move(resource.s3_storage_param.root_path); @@ -1211,7 +1212,7 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() { st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs); } else { fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs); - fs->set_conf(s3_conf); + st = fs->set_conf(s3_conf); } if (!st.ok()) { LOG(WARNING) << "update s3 resource failed: " << st; diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 6423de4fa15..fd21c3f2395 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -96,6 +96,29 @@ namespace io { RETURN_IF_ERROR(get_key(path, &key)); #endif +// Guarded by external lock. +Status S3FileSystem::set_conf(S3Conf s3_conf) { + if (s3_conf.ak == _s3_conf.ak && s3_conf.sk == _s3_conf.sk && s3_conf.token == _s3_conf.token) { + return Status::OK(); // Same conf + } + + auto reset_conf = _s3_conf; + reset_conf.ak = s3_conf.ak; + reset_conf.sk = s3_conf.sk; + reset_conf.token = s3_conf.token; + auto client = S3ClientFactory::instance().create(s3_conf); + if (!client) { + return Status::InternalError("failed to init s3 client with {}", _s3_conf.to_string()); + } + + { + std::lock_guard lock(_client_mu); + _client = std::move(client); + } + _s3_conf = std::move(reset_conf); + return Status::OK(); +} + Status S3FileSystem::create(S3Conf s3_conf, std::string id, std::shared_ptr<S3FileSystem>* fs) { (*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id))); return (*fs)->connect(); diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 301d97c73cd..0044288b3a5 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -59,7 +59,7 @@ public: static Status create(S3Conf s3_conf, std::string id, std::shared_ptr<S3FileSystem>* fs); ~S3FileSystem() override; // Guarded by external lock. - void set_conf(S3Conf s3_conf) { _s3_conf = std::move(s3_conf); } + Status set_conf(S3Conf s3_conf); std::shared_ptr<Aws::S3::S3Client> get_client() const { std::lock_guard lock(_client_mu); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index b8ef318077f..1dd09cbf981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -104,7 +104,8 @@ public class S3Resource extends Resource { properties.putIfAbsent(S3Properties.REGION, region); String ak = properties.get(S3Properties.ACCESS_KEY); String sk = properties.get(S3Properties.SECRET_KEY); - CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk); + String token = properties.get(S3Properties.SESSION_TOKEN); + CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token); if (needCheck) { String bucketName = properties.get(S3Properties.BUCKET); @@ -125,6 +126,7 @@ public class S3Resource extends Resource { Map<String, String> propertiesPing = new HashMap<>(); propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey()); propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey()); + propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken()); propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint()); propertiesPing.put(S3Properties.Env.REGION, credential.getRegion()); propertiesPing.put(PropertyConverter.USE_PATH_STYLE, @@ -189,6 +191,10 @@ public class S3Resource extends Resource { writeLock(); for (Map.Entry<String, String> kv : properties.entrySet()) { replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue()); + if (kv.getKey().equals(S3Properties.Env.TOKEN) + || kv.getKey().equals(S3Properties.SESSION_TOKEN)) { + this.properties.put(kv.getKey(), kv.getValue()); + } } ++version; writeUnlock(); @@ -198,11 +204,13 @@ public class S3Resource extends Resource { private CloudCredentialWithEndpoint getS3PingCredentials(Map<String, String> properties) { String ak = properties.getOrDefault(S3Properties.ACCESS_KEY, this.properties.get(S3Properties.ACCESS_KEY)); String sk = properties.getOrDefault(S3Properties.SECRET_KEY, this.properties.get(S3Properties.SECRET_KEY)); + String token = properties.getOrDefault(S3Properties.SESSION_TOKEN, + this.properties.get(S3Properties.SESSION_TOKEN)); String endpoint = properties.getOrDefault(S3Properties.ENDPOINT, this.properties.get(S3Properties.ENDPOINT)); String pingEndpoint = "http://" + endpoint; String region = S3Properties.getRegionOfEndpoint(pingEndpoint); properties.putIfAbsent(S3Properties.REGION, region); - return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk); + return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token); } private boolean isNeedCheck(Map<String, String> newProperties) { @@ -232,7 +240,9 @@ public class S3Resource extends Resource { // it's dangerous to show password in show odbc resource, // so we use empty string to replace the real password if (entry.getKey().equals(S3Properties.Env.SECRET_KEY) - || entry.getKey().equals(S3Properties.SECRET_KEY)) { + || entry.getKey().equals(S3Properties.SECRET_KEY) + || entry.getKey().equals(S3Properties.Env.TOKEN) + || entry.getKey().equals(S3Properties.SESSION_TOKEN)) { result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), "******")); } else { result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index c4d3cce9c2c..51bca2c66b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -253,6 +253,7 @@ public class S3Properties extends BaseProperties { s3Info.setRegion(properties.get(S3Properties.REGION)); s3Info.setAk(properties.get(S3Properties.ACCESS_KEY)); s3Info.setSk(properties.get(S3Properties.SECRET_KEY)); + s3Info.setToken(properties.get(S3Properties.SESSION_TOKEN)); s3Info.setRootPath(properties.get(S3Properties.ROOT_PATH)); s3Info.setBucket(properties.get(S3Properties.BUCKET)); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 589c9b97b6b..83939252b09 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -71,6 +71,7 @@ struct TS3StorageParam { 8: optional string root_path 9: optional string bucket 10: optional bool use_path_style = false + 11: optional string token } struct TStoragePolicy { diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy b/regression-test/suites/cold_heat_separation/policy/alter.groovy index 3cb46cde103..83ead8e0176 100644 --- a/regression-test/suites/cold_heat_separation/policy/alter.groovy +++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy @@ -39,6 +39,7 @@ suite("alter_policy") { "AWS_ROOT_PATH" = "path/to/rootaaaa", "AWS_ACCESS_KEY" = "bbba", "AWS_SECRET_KEY" = "aaaa", + "AWS_TOKEN" = "session_token", "AWS_MAX_CONNECTIONS" = "50", "AWS_REQUEST_TIMEOUT_MS" = "3000", "AWS_CONNECTION_TIMEOUT_MS" = "1000", @@ -70,6 +71,10 @@ suite("alter_policy") { ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_REQUEST_TIMEOUT_MS" = "7777"); """ + def alter_result_succ_8 = try_sql """ + ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_TOKEN" = "new_session_token"); + """ + // errCode = 2, detailMessage = current not support modify property : AWS_REGION def alter_result_fail_1 = try_sql """ ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_REGION" = "8888"); @@ -112,6 +117,7 @@ suite("alter_policy") { // [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777], // [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa], // [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******], + // [has_resouce_policy_alter, s3, AWS_TOKEN, ******], // [has_resouce_policy_alter, s3, id, {id}], // [has_resouce_policy_alter, s3, type, s3] // [has_resouce_policy_alter, s3, version, {version}]] @@ -133,6 +139,8 @@ suite("alter_policy") { assertEquals(show_alter_result[8][3], "10101010") // AWS_SECRET_KEY assertEquals(show_alter_result[9][3], "******") + // AWS_TOKEN + assertEquals(show_alter_result[10][3], "******") } def check_alter_resource_result_with_policy = { resource_name -> @@ -151,6 +159,7 @@ suite("alter_policy") { // [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777], // [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa], // [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******], + // [has_resouce_policy_alter, s3, AWS_TOKEN, ******], // [has_resouce_policy_alter, s3, id, {id}], // [has_resouce_policy_alter, s3, type, s3] // [has_resouce_policy_alter, s3, version, {version}]] @@ -172,6 +181,8 @@ suite("alter_policy") { assertEquals(show_alter_result[8][3], "path/to/rootaaaa") // AWS_SECRET_KEY assertEquals(show_alter_result[9][3], "******") + // AWS_TOKEN + assertEquals(show_alter_result[10][3], "******") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org