morningman commented on code in PR #50849:
URL: https://github.com/apache/doris/pull/50849#discussion_r2095092188
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java:
##########
@@ -75,42 +77,61 @@ public BrokerDesc(String name, Map<String, String>
properties) {
if (properties != null) {
this.properties.putAll(properties);
}
+ // Assume the storage type is BROKER by default
+ // If it's a multi-load broker, override the storage type to LOCAL
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
- if (this.convertedToS3) {
- this.storageType = StorageBackend.StorageType.S3;
+
+ // Try to determine the actual storage type from properties if
available
+ if (MapUtils.isNotEmpty(properties)) {
+ try {
+ // Create primary storage properties from the given
configuration
+ this.storageProperties =
StorageProperties.createPrimary(properties);
+ // Override the storage type based on property configuration
+ this.storageType =
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+ } catch (RuntimeException e) {
+ // Currently ignored: these properties might be
broker-specific.
+ // Support for broker properties will be added in the future.
+ LOG.info("Failed to create storage properties for broker: {},
properties: {}", name, properties, e);
+ }
+ }
+ if (StringUtils.isBlank(this.name)) {
Review Comment:
This logic is strange. Above, you have just set the `storageType`.
So if `this.name` is not empty, the `this.storageType().name()` may not
equals to `this.name`?
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java:
##########
@@ -75,42 +77,61 @@ public BrokerDesc(String name, Map<String, String>
properties) {
if (properties != null) {
this.properties.putAll(properties);
}
+ // Assume the storage type is BROKER by default
+ // If it's a multi-load broker, override the storage type to LOCAL
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
- if (this.convertedToS3) {
- this.storageType = StorageBackend.StorageType.S3;
+
+ // Try to determine the actual storage type from properties if
available
+ if (MapUtils.isNotEmpty(properties)) {
+ try {
+ // Create primary storage properties from the given
configuration
+ this.storageProperties =
StorageProperties.createPrimary(properties);
+ // Override the storage type based on property configuration
+ this.storageType =
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+ } catch (RuntimeException e) {
+ // Currently ignored: these properties might be
broker-specific.
+ // Support for broker properties will be added in the future.
+ LOG.info("Failed to create storage properties for broker: {},
properties: {}", name, properties, e);
+ }
+ }
+ if (StringUtils.isBlank(this.name)) {
+ this.name = this.storageType().name();
}
}
public BrokerDesc(String name, StorageBackend.StorageType storageType,
Map<String, String> properties) {
this.name = name;
this.properties = Maps.newHashMap();
+ this.storageType = storageType;
if (properties != null) {
this.properties.putAll(properties);
}
- this.storageType = storageType;
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
- if (this.convertedToS3) {
- this.storageType = StorageBackend.StorageType.S3;
+ if (MapUtils.isNotEmpty(properties)) {
Review Comment:
Do we need to check `properties` or `this.properties`
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java:
##########
@@ -75,42 +77,61 @@ public BrokerDesc(String name, Map<String, String>
properties) {
if (properties != null) {
this.properties.putAll(properties);
}
+ // Assume the storage type is BROKER by default
+ // If it's a multi-load broker, override the storage type to LOCAL
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
- this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties,
this.storageType);
- if (this.convertedToS3) {
- this.storageType = StorageBackend.StorageType.S3;
+
+ // Try to determine the actual storage type from properties if
available
+ if (MapUtils.isNotEmpty(properties)) {
+ try {
+ // Create primary storage properties from the given
configuration
+ this.storageProperties =
StorageProperties.createPrimary(properties);
+ // Override the storage type based on property configuration
+ this.storageType =
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+ } catch (RuntimeException e) {
+ // Currently ignored: these properties might be
broker-specific.
+ // Support for broker properties will be added in the future.
+ LOG.info("Failed to create storage properties for broker: {},
properties: {}", name, properties, e);
Review Comment:
2 issues:
1. Better define a new Exception, eg (StoragePropertyExeception extends
RuntimeException)
2. if exception is thrown, the `this.storageProperties` will be null? Is it
safe? Why not assign `BrokerProperties` to ti?
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.collect.ImmutableSet;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class MinioProperties extends AbstractS3CompatibleProperties {
Review Comment:
Add unittest from MinioProperties
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java:
##########
@@ -38,12 +39,12 @@ public class OBSProperties extends
AbstractS3CompatibleProperties {
protected String endpoint = "";
@Getter
- @ConnectorProperty(names = {"obs.access_key", "AWS_ACCESS_KEY",
"ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"obs.access_key", "AWS_ACCESS_KEY",
"ACCESS_KEY", "access_key", "s3.access_key"},
description = "The access key of OBS.")
protected String accessKey = "";
@Getter
- @ConnectorProperty(names = {"obs.secret_key", "secret_key",
"s3.secret_key"},
+ @ConnectorProperty(names = {"obs.secret_key", "s3.secret_key",
"AWS_SECRET_KEY", "secret_key", "SECRET_KEY"},
Review Comment:
Better use the same order of name types in the `names` list.
eg, `obs.`, `s3.`, `no prefix`, `AWS_xxx`
##########
fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java:
##########
@@ -203,17 +203,43 @@ public static Repository read(DataInput in) throws
IOException {
}
}
+ public Status alterRepositoryS3Properties(Map<String, String> properties) {
+ if (this.fileSystem instanceof S3FileSystem) {
+ Map<String, String> oldProperties = new
HashMap<>(this.getRemoteFileSystem().getProperties());
+ oldProperties.remove(S3Properties.ACCESS_KEY);
+ oldProperties.remove(S3Properties.SECRET_KEY);
+ oldProperties.remove(S3Properties.SESSION_TOKEN);
+ oldProperties.remove(S3Properties.Env.ACCESS_KEY);
+ oldProperties.remove(S3Properties.Env.SECRET_KEY);
+ oldProperties.remove(S3Properties.Env.TOKEN);
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.ACCESS_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.ACCESS_KEY,
entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.SECRET_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.SECRET_KEY,
entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.TOKEN)) {
+ oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN,
entry.getValue());
+ }
+ }
+ properties.clear();
+ properties.putAll(oldProperties);
+ return Status.OK;
+ } else {
+ return new Status(ErrCode.COMMON_ERROR, "Only support alter s3
repository");
+ }
+ }
+
@Override
public void gsonPostProcess() {
- StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
- if
(this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) {
- type = StorageBackend.StorageType.valueOf(
-
this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE));
-
this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE);
+ if (!BrokerFileSystem.class.equals(fileSystem.getClass())) {
Review Comment:
Why not using `instance of`?
##########
fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java:
##########
@@ -101,60 +104,115 @@ public S3Client getClient() throws UserException {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(s3Properties.getAccessKey());
credential.setSecretKey(s3Properties.getSecretKey());
-
- /* if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }*/
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ credential.setSessionToken(s3Properties.getSessionToken());
+ }
client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
}
return client;
}
-
-
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
- URI uri = new URI(remotePath);
- String bucketName = uri.getHost();
- String prefix = uri.getPath().substring(1);
- int wildcardIndex = prefix.indexOf('*');
- String searchPrefix = wildcardIndex > 0 ? prefix.substring(0,
wildcardIndex) : prefix;
- try (S3Client s3 = getClient()) {
- ListObjectsV2Request listRequest =
ListObjectsV2Request.builder()
- .bucket(bucketName)
- .prefix(searchPrefix)
- .build();
-
- ListObjectsV2Response listResponse =
s3.listObjectsV2(listRequest);
- String regex = prefix.replace(".", "\\.")
- .replace("*", ".*")
- .replace("?", ".");
- Pattern pattern = Pattern.compile(regex);
- List<RemoteFile> matchedFiles =
listResponse.contents().stream()
- .filter(obj -> pattern.matcher(obj.key()).matches())
- .map(obj -> {
- String fullKey = obj.key();
- String fullPath = "s3://" + bucketName + "/" +
fullKey;
- return new RemoteFile(
- fileNameOnly ?
fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
- true,
- obj.size(),
- -1,
- obj.lastModified().toEpochMilli()
- );
- })
- .collect(Collectors.toList());
-
- result.addAll(matchedFiles);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
Review Comment:
```suggestion
LOG.debug("globList globPath:{}, remotePath:{}", globPath,
remotePath);
```
##########
fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java:
##########
@@ -101,60 +104,115 @@ public S3Client getClient() throws UserException {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(s3Properties.getAccessKey());
credential.setSecretKey(s3Properties.getSecretKey());
-
- /* if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }*/
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ credential.setSessionToken(s3Properties.getSessionToken());
+ }
client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
}
return client;
}
-
-
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
- URI uri = new URI(remotePath);
- String bucketName = uri.getHost();
- String prefix = uri.getPath().substring(1);
- int wildcardIndex = prefix.indexOf('*');
- String searchPrefix = wildcardIndex > 0 ? prefix.substring(0,
wildcardIndex) : prefix;
- try (S3Client s3 = getClient()) {
- ListObjectsV2Request listRequest =
ListObjectsV2Request.builder()
- .bucket(bucketName)
- .prefix(searchPrefix)
- .build();
-
- ListObjectsV2Response listResponse =
s3.listObjectsV2(listRequest);
- String regex = prefix.replace(".", "\\.")
- .replace("*", ".*")
- .replace("?", ".");
- Pattern pattern = Pattern.compile(regex);
- List<RemoteFile> matchedFiles =
listResponse.contents().stream()
- .filter(obj -> pattern.matcher(obj.key()).matches())
- .map(obj -> {
- String fullKey = obj.key();
- String fullPath = "s3://" + bucketName + "/" +
fullKey;
- return new RemoteFile(
- fileNameOnly ?
fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
- true,
- obj.size(),
- -1,
- obj.lastModified().toEpochMilli()
- );
- })
- .collect(Collectors.toList());
-
- result.addAll(matchedFiles);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
+
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+ HashSet<String> directorySet = new HashSet<>();
+
+ String listPrefix = getLongestPrefix(globPath); // similar to Azure
+ LOG.info("globList listPrefix: {}", listPrefix);
+
+ ListObjectsV2Request request = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(listPrefix)
+ .build();
+
+ boolean isTruncated = false;
+ do {
+ roundCnt++;
+ ListObjectsV2Response response =
getClient().listObjectsV2(request);
+ for (S3Object obj : response.contents()) {
+ elementCnt++;
+ java.nio.file.Path objPath = Paths.get(obj.key());
+
+ boolean isPrefix = false;
+ while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
+ if (!matcher.matches(objPath)) {
+ isPrefix = true;
+ objPath = objPath.getParent();
+ continue;
+ }
+ if
(directorySet.contains(objPath.normalize().toString())) {
+ break;
+ }
+ if (isPrefix) {
+ directorySet.add(objPath.normalize().toString());
+ }
+
+ matchCnt++;
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
objPath.getFileName().toString() :
+ "s3://" + bucket + "/" + objPath,
+ !isPrefix,
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? 0 :
obj.lastModified().toEpochMilli()
+ );
+ result.add(remoteFile);
+ objPath = objPath.getParent();
+ isPrefix = true;
+ }
+ }
+
+ isTruncated = response.isTruncated();
+ if (isTruncated) {
+ request = request.toBuilder()
+
.continuationToken(response.nextContinuationToken())
+ .build();
+ }
+ } while (isTruncated);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
return Status.OK;
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
return new Status(Status.ErrCode.COMMON_ERROR, "Errors while
getting file status " + e.getMessage());
+ } finally {
+ long endTime = System.nanoTime();
+ long duration = endTime - startTime;
+ LOG.info("process {} elements under prefix {} for {} round, match
{} elements, take {} ms",
+ elementCnt, remotePath, roundCnt, matchCnt,
+ duration / 1000);
}
}
+ public static String getLongestPrefix(String globPattern) {
Review Comment:
```suggestion
private static String getLongestPrefix(String globPattern) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java:
##########
@@ -581,6 +581,24 @@ public class GsonUtils {
.registerSubtype(S3FileSystem.class,
S3FileSystem.class.getSimpleName())
.registerSubtype(AzureFileSystem.class,
AzureFileSystem.class.getSimpleName());
+ private static
RuntimeTypeAdapterFactory<org.apache.doris.fsv2.PersistentFileSystem>
Review Comment:
I think we should not add this if we are not using fsv2
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java:
##########
@@ -150,16 +171,18 @@ public void readFields(DataInput in) throws IOException {
final String val = Text.readString(in);
properties.put(key, val);
}
- StorageBackend.StorageType st = StorageBackend.StorageType.BROKER;
- String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE);
- if (typeStr != null) {
+ if (MapUtils.isNotEmpty(properties)) {
try {
- st = StorageBackend.StorageType.valueOf(typeStr);
- } catch (IllegalArgumentException e) {
- LOG.warn("set to BROKER, because of exception", e);
+ this.storageProperties =
StorageProperties.createPrimary(properties);
+ this.storageType =
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+ } catch (RuntimeException e) {
+ // Currently ignored: these properties might be
broker-specific.
+ // Support for broker properties will be added in the future.
+ LOG.warn("Failed to create storage properties for broker: {},
properties: {}", name, properties, e);
+ this.storageType = StorageBackend.StorageType.BROKER;
Review Comment:
I saw sometimes we just ignore the exception, sometimes we set `storageType`
like here. Why?
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java:
##########
@@ -185,6 +148,24 @@ public TStorageBackendType toThrift() {
return TStorageBackendType.BROKER;
}
}
+
+ public static final Set<StorageType> REFACTOR_STORAGE_TYPES =
Review Comment:
Add comment for this field
##########
fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java:
##########
@@ -231,9 +237,41 @@ public void createRepository(CreateRepositoryStmt stmt)
throws DdlException {
}
public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
Review Comment:
Merge `alterRepository` and `alterRepositoryInternal`
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java:
##########
@@ -52,13 +53,69 @@ public static String validateAndGetUri(Map<String, String>
props) throws UserExc
if (props.isEmpty()) {
throw new UserException("props is empty");
}
- if (!props.containsKey(URI_KEY)) {
+ String uriStr = getUri(props);
+ if (StringUtils.isBlank(uriStr)) {
throw new UserException("props must contain uri");
}
- String uriStr = props.get(URI_KEY);
return validateAndNormalizeUri(uriStr);
}
+ public static String extractDefaultFsFromPath(String filePath) {
+ if (StringUtils.isBlank(filePath)) {
+ return null;
+ }
+ try {
+ URI uri = URI.create(filePath);
+ return uri.getScheme() + "://" + uri.getAuthority();
+ } catch (AnalysisException e) {
+ throw new IllegalArgumentException("Invalid file path: " +
filePath, e);
+ }
+ }
+
+ public static String extractDefaultFsFromUri(Map<String, String> props) {
Review Comment:
merge the common logic with `extractDefaultFsFromPath`
##########
fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java:
##########
@@ -101,60 +104,115 @@ public S3Client getClient() throws UserException {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(s3Properties.getAccessKey());
credential.setSecretKey(s3Properties.getSecretKey());
-
- /* if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }*/
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ credential.setSessionToken(s3Properties.getSessionToken());
+ }
client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
}
return client;
}
-
-
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
- URI uri = new URI(remotePath);
- String bucketName = uri.getHost();
- String prefix = uri.getPath().substring(1);
- int wildcardIndex = prefix.indexOf('*');
- String searchPrefix = wildcardIndex > 0 ? prefix.substring(0,
wildcardIndex) : prefix;
- try (S3Client s3 = getClient()) {
- ListObjectsV2Request listRequest =
ListObjectsV2Request.builder()
- .bucket(bucketName)
- .prefix(searchPrefix)
- .build();
-
- ListObjectsV2Response listResponse =
s3.listObjectsV2(listRequest);
- String regex = prefix.replace(".", "\\.")
- .replace("*", ".*")
- .replace("?", ".");
- Pattern pattern = Pattern.compile(regex);
- List<RemoteFile> matchedFiles =
listResponse.contents().stream()
- .filter(obj -> pattern.matcher(obj.key()).matches())
- .map(obj -> {
- String fullKey = obj.key();
- String fullPath = "s3://" + bucketName + "/" +
fullKey;
- return new RemoteFile(
- fileNameOnly ?
fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
- true,
- obj.size(),
- -1,
- obj.lastModified().toEpochMilli()
- );
- })
- .collect(Collectors.toList());
-
- result.addAll(matchedFiles);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
+
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+ HashSet<String> directorySet = new HashSet<>();
+
+ String listPrefix = getLongestPrefix(globPath); // similar to Azure
+ LOG.info("globList listPrefix: {}", listPrefix);
+
+ ListObjectsV2Request request = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(listPrefix)
+ .build();
+
+ boolean isTruncated = false;
+ do {
+ roundCnt++;
+ ListObjectsV2Response response =
getClient().listObjectsV2(request);
+ for (S3Object obj : response.contents()) {
+ elementCnt++;
+ java.nio.file.Path objPath = Paths.get(obj.key());
+
+ boolean isPrefix = false;
+ while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
+ if (!matcher.matches(objPath)) {
+ isPrefix = true;
+ objPath = objPath.getParent();
+ continue;
+ }
+ if
(directorySet.contains(objPath.normalize().toString())) {
+ break;
+ }
+ if (isPrefix) {
+ directorySet.add(objPath.normalize().toString());
+ }
+
+ matchCnt++;
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
objPath.getFileName().toString() :
+ "s3://" + bucket + "/" + objPath,
+ !isPrefix,
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? 0 :
obj.lastModified().toEpochMilli()
+ );
+ result.add(remoteFile);
+ objPath = objPath.getParent();
+ isPrefix = true;
+ }
+ }
+
+ isTruncated = response.isTruncated();
+ if (isTruncated) {
+ request = request.toBuilder()
+
.continuationToken(response.nextContinuationToken())
+ .build();
+ }
+ } while (isTruncated);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
return Status.OK;
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
return new Status(Status.ErrCode.COMMON_ERROR, "Errors while
getting file status " + e.getMessage());
+ } finally {
+ long endTime = System.nanoTime();
+ long duration = endTime - startTime;
+ LOG.info("process {} elements under prefix {} for {} round, match
{} elements, take {} ms",
+ elementCnt, remotePath, roundCnt, matchCnt,
+ duration / 1000);
}
}
+ public static String getLongestPrefix(String globPattern) {
Review Comment:
This is same as S3Utils.getLongestPrefix(), merge them
##########
fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java:
##########
@@ -101,60 +104,115 @@ public S3Client getClient() throws UserException {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(s3Properties.getAccessKey());
credential.setSecretKey(s3Properties.getSecretKey());
-
- /* if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }*/
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ credential.setSessionToken(s3Properties.getSessionToken());
+ }
client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
}
return client;
}
-
-
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
- URI uri = new URI(remotePath);
- String bucketName = uri.getHost();
- String prefix = uri.getPath().substring(1);
- int wildcardIndex = prefix.indexOf('*');
- String searchPrefix = wildcardIndex > 0 ? prefix.substring(0,
wildcardIndex) : prefix;
- try (S3Client s3 = getClient()) {
- ListObjectsV2Request listRequest =
ListObjectsV2Request.builder()
- .bucket(bucketName)
- .prefix(searchPrefix)
- .build();
-
- ListObjectsV2Response listResponse =
s3.listObjectsV2(listRequest);
- String regex = prefix.replace(".", "\\.")
- .replace("*", ".*")
- .replace("?", ".");
- Pattern pattern = Pattern.compile(regex);
- List<RemoteFile> matchedFiles =
listResponse.contents().stream()
- .filter(obj -> pattern.matcher(obj.key()).matches())
- .map(obj -> {
- String fullKey = obj.key();
- String fullPath = "s3://" + bucketName + "/" +
fullKey;
- return new RemoteFile(
- fileNameOnly ?
fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
- true,
- obj.size(),
- -1,
- obj.lastModified().toEpochMilli()
- );
- })
- .collect(Collectors.toList());
-
- result.addAll(matchedFiles);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
+
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+ HashSet<String> directorySet = new HashSet<>();
+
+ String listPrefix = getLongestPrefix(globPath); // similar to Azure
+ LOG.info("globList listPrefix: {}", listPrefix);
Review Comment:
```suggestion
LOG.debug("globList listPrefix: {}", listPrefix);
```
##########
fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java:
##########
@@ -101,60 +104,115 @@ public S3Client getClient() throws UserException {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(s3Properties.getAccessKey());
credential.setSecretKey(s3Properties.getSecretKey());
-
- /* if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }*/
+ if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+ credential.setSessionToken(s3Properties.getSessionToken());
+ }
client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
}
return client;
}
-
-
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
- URI uri = new URI(remotePath);
- String bucketName = uri.getHost();
- String prefix = uri.getPath().substring(1);
- int wildcardIndex = prefix.indexOf('*');
- String searchPrefix = wildcardIndex > 0 ? prefix.substring(0,
wildcardIndex) : prefix;
- try (S3Client s3 = getClient()) {
- ListObjectsV2Request listRequest =
ListObjectsV2Request.builder()
- .bucket(bucketName)
- .prefix(searchPrefix)
- .build();
-
- ListObjectsV2Response listResponse =
s3.listObjectsV2(listRequest);
- String regex = prefix.replace(".", "\\.")
- .replace("*", ".*")
- .replace("?", ".");
- Pattern pattern = Pattern.compile(regex);
- List<RemoteFile> matchedFiles =
listResponse.contents().stream()
- .filter(obj -> pattern.matcher(obj.key()).matches())
- .map(obj -> {
- String fullKey = obj.key();
- String fullPath = "s3://" + bucketName + "/" +
fullKey;
- return new RemoteFile(
- fileNameOnly ?
fullPath.substring(fullPath.lastIndexOf('/') + 1) : fullPath,
- true,
- obj.size(),
- -1,
- obj.lastModified().toEpochMilli()
- );
- })
- .collect(Collectors.toList());
-
- result.addAll(matchedFiles);
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
+
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+ HashSet<String> directorySet = new HashSet<>();
+
+ String listPrefix = getLongestPrefix(globPath); // similar to Azure
+ LOG.info("globList listPrefix: {}", listPrefix);
+
+ ListObjectsV2Request request = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(listPrefix)
+ .build();
+
+ boolean isTruncated = false;
+ do {
+ roundCnt++;
+ ListObjectsV2Response response =
getClient().listObjectsV2(request);
+ for (S3Object obj : response.contents()) {
+ elementCnt++;
+ java.nio.file.Path objPath = Paths.get(obj.key());
+
+ boolean isPrefix = false;
+ while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
+ if (!matcher.matches(objPath)) {
+ isPrefix = true;
+ objPath = objPath.getParent();
+ continue;
+ }
+ if
(directorySet.contains(objPath.normalize().toString())) {
+ break;
+ }
+ if (isPrefix) {
+ directorySet.add(objPath.normalize().toString());
+ }
+
+ matchCnt++;
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
objPath.getFileName().toString() :
+ "s3://" + bucket + "/" + objPath,
+ !isPrefix,
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? 0 :
obj.lastModified().toEpochMilli()
+ );
+ result.add(remoteFile);
+ objPath = objPath.getParent();
+ isPrefix = true;
+ }
+ }
+
+ isTruncated = response.isTruncated();
+ if (isTruncated) {
+ request = request.toBuilder()
+
.continuationToken(response.nextContinuationToken())
+ .build();
+ }
+ } while (isTruncated);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
return Status.OK;
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
return new Status(Status.ErrCode.COMMON_ERROR, "Errors while
getting file status " + e.getMessage());
+ } finally {
+ long endTime = System.nanoTime();
+ long duration = endTime - startTime;
+ LOG.info("process {} elements under prefix {} for {} round, match
{} elements, take {} ms",
Review Comment:
```suggestion
LOG.debug("process {} elements under prefix {} for {} round,
match {} elements, take {} ms",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]