This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 5bec4132ffbec0d038bfc40b47803d07b554107b Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Mon Nov 1 10:48:10 2021 +0800 [S3] Support path style endpoint (#6962) Add a use_path_style property for S3 Upgrade hadoop-common and hadoop-aws to 2.8.0 to support path style property Fix some S3 URI bugs Add some logs for tracing load process. --- be/src/exec/tablet_sink.cpp | 6 +- be/src/olap/delta_writer.cpp | 4 +- be/src/runtime/load_channel.cpp | 3 +- be/src/runtime/tablets_channel.cpp | 5 +- be/src/runtime/tablets_channel.h | 2 +- be/src/service/internal_service.cpp | 3 +- be/src/util/s3_util.cpp | 11 ++- be/test/util/s3_storage_backend_test.cpp | 17 +++-- .../load-data/s3-load-manual.md | 21 +++++- .../load-data/s3-load-manual.md | 21 +++++- fe/fe-core/pom.xml | 2 +- .../java/org/apache/doris/backup/BlobStorage.java | 1 - .../java/org/apache/doris/backup/Repository.java | 14 ++-- .../java/org/apache/doris/backup/S3Storage.java | 59 +++++++++++----- .../org/apache/doris/common/util/BrokerUtil.java | 15 +++-- .../java/org/apache/doris/common/util/S3URI.java | 68 +++++++++++++++---- .../doris/httpv2/rest/manager/NodeAction.java | 10 +-- .../org/apache/doris/backup/S3StorageTest.java | 10 +-- .../org/apache/doris/common/util/S3URITest.java | 38 ++++++----- .../doris/planner/StreamLoadScanNodeTest.java | 78 ++++++++++++++++++++-- fe/pom.xml | 2 +- gensrc/proto/internal_service.proto | 2 + 22 files changed, 288 insertions(+), 104 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index c5c02d2..b818918 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -86,6 +86,7 @@ Status NodeChannel::init(RuntimeState* state) { _cur_add_batch_request.set_allocated_id(&_parent->_load_id); _cur_add_batch_request.set_index_id(_index_id); _cur_add_batch_request.set_sender_id(_parent->_sender_id); + _cur_add_batch_request.set_backend_id(_node_id); _cur_add_batch_request.set_eos(false); _rpc_timeout_ms = state->query_options().query_timeout * 1000; @@ -93,7 +94,7 @@ Status NodeChannel::init(RuntimeState* state) { _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); - _name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]"; + _name = fmt::format("NodeChannel[{}-{}]", _index_id, _node_id); return Status::OK(); } @@ -282,7 +283,8 @@ Status NodeChannel::close_wait(RuntimeState* state) { SleepFor(MonoDelta::FromMilliseconds(1)); } timer.stop(); - VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms"; + VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms" + << ", " << _load_info; if (_add_batches_finished) { { diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 3868e88..afa014b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -321,7 +321,9 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf _delta_written_success = true; const FlushStatistic& stat = _flush_token->get_stats(); - VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat; + VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id() + << ", load id: " << print_id(_req.load_id) + << ", stats: " << stat; return OLAP_SUCCESS; } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 43f8a88..01e8d66 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -94,7 +94,8 @@ Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request, Status st; if (request.has_eos() && request.eos()) { bool finished = false; - RETURN_IF_ERROR(channel->close(request.sender_id(), &finished, request.partition_ids(), + RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), + &finished, request.partition_ids(), tablet_vec)); if (finished) { std::lock_guard<std::mutex> l(_lock); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 531a3bd..db5e23b 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -126,7 +126,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { return Status::OK(); } -Status TabletsChannel::close(int sender_id, bool* finished, +Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, const google::protobuf::RepeatedField<int64_t>& partition_ids, google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) { std::lock_guard<std::mutex> l(_lock); @@ -138,7 +138,8 @@ Status TabletsChannel::close(int sender_id, bool* finished, *finished = (_num_remaining_senders == 0); return _close_status; } - LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id; + LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id + << ", backend id: " << backend_id; for (auto pid : partition_ids) { _partition_ids.emplace(pid); } diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 0f01512..b0402bd 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -67,7 +67,7 @@ public: // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec' // to include all tablets written in this channel. // no-op when this channel has been closed or cancelled - Status close(int sender_id, bool* finished, + Status close(int sender_id, int64_t backend_id, bool* finished, const google::protobuf::RepeatedField<int64_t>& partition_ids, google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 35e94a9..93dc6b1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -123,7 +123,8 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr if (!st.ok()) { LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() << ", id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + << ", sender_id=" << request->sender_id() + << ", backend id=" << request->backend_id(); } st.to_protobuf(response->mutable_status()); } diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index 364f580..160b5f4 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -30,6 +30,7 @@ const static std::string S3_AK = "AWS_ACCESS_KEY"; const static std::string S3_SK = "AWS_SECRET_KEY"; const static std::string S3_ENDPOINT = "AWS_ENDPOINT"; const static std::string S3_REGION = "AWS_REGION"; +const static std::string USE_PATH_STYLE = "use_path_style"; ClientFactory::ClientFactory() { _aws_options = Aws::SDKOptions{}; @@ -67,7 +68,15 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( Aws::Client::ClientConfiguration aws_config; aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second; aws_config.region = properties.find(S3_REGION)->second; - return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), std::move(aws_config)); + + // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html + bool use_virtual_addressing = true; + if (properties.find(USE_PATH_STYLE) != properties.end()) { + use_virtual_addressing = properties.find(USE_PATH_STYLE)->second == "true" ? false : true; + } + return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), std::move(aws_config), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + use_virtual_addressing); } } // end namespace doris diff --git a/be/test/util/s3_storage_backend_test.cpp b/be/test/util/s3_storage_backend_test.cpp index a8ea878..3e8cc87 100644 --- a/be/test/util/s3_storage_backend_test.cpp +++ b/be/test/util/s3_storage_backend_test.cpp @@ -33,18 +33,20 @@ #include "util/storage_backend.h" namespace doris { -static const std::string AK = "AK"; -static const std::string SK = "SK"; +static const std::string AK = ""; +static const std::string SK = ""; static const std::string ENDPOINT = "http://s3.bj.bcebos.com"; +static const std::string USE_PATH_STYLE = "false"; static const std::string REGION = "bj"; -static const std::string BUCKET = "s3://yang-repo/"; +static const std::string BUCKET = "s3://cmy-repo/"; class S3StorageBackendTest : public testing::Test { public: S3StorageBackendTest() : _aws_properties({{"AWS_ACCESS_KEY", AK}, {"AWS_SECRET_KEY", SK}, {"AWS_ENDPOINT", ENDPOINT}, - {"AWS_REGION", "bj"}}) { + {"USE_PATH_STYLE", USE_PATH_STYLE}, + {"AWS_REGION", REGION}}) { _s3.reset(new S3StorageBackend(_aws_properties)); _s3_base_path = BUCKET + "s3/" + gen_uuid(); } @@ -189,10 +191,7 @@ TEST_F(S3StorageBackendTest, s3_mkdir) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); int ret = 0; - Aws::SDKOptions options; - Aws::InitAPI(options); - // ak sk is secret + // set ak sk before running it. // ret = RUN_ALL_TESTS(); - Aws::ShutdownAPI(options); return ret; -} \ No newline at end of file +} diff --git a/docs/en/administrator-guide/load-data/s3-load-manual.md b/docs/en/administrator-guide/load-data/s3-load-manual.md index 7e3aa28..b9c2b2a 100644 --- a/docs/en/administrator-guide/load-data/s3-load-manual.md +++ b/docs/en/administrator-guide/load-data/s3-load-manual.md @@ -48,12 +48,12 @@ Other cloud storage systems can find relevant information compatible with S3 in Like Broker Load just replace `WITH BROKER broker_name ()` with ``` WITH S3 - ( + ( "AWS_ENDPOINT" = "AWS_ENDPOINT", "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY", "AWS_SECRET_KEY"="AWS_SECRET_KEY", "AWS_REGION" = "AWS_REGION" - ) + ) ``` example: @@ -75,4 +75,19 @@ example: ( "timeout" = "3600" ); -``` \ No newline at end of file +``` + +## FAQ + +S3 SDK uses virtual-hosted style by default. However, some object storage systems may not be enabled or support virtual-hosted style access. At this time, we can add the `use_path_style` parameter to force the use of path style: + +``` + WITH S3 + ( + "AWS_ENDPOINT" = "AWS_ENDPOINT", + "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY", + "AWS_SECRET_KEY"="AWS_SECRET_KEY", + "AWS_REGION" = "AWS_REGION", + "use_path_style" = "true" + ) +``` diff --git a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md index 4bf52ce..3c9b6c5 100644 --- a/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/s3-load-manual.md @@ -49,12 +49,12 @@ under the License. 导入方式和Broker Load 基本相同,只需要将 `WITH BROKER broker_name ()` 语句替换成如下部分 ``` WITH S3 - ( + ( "AWS_ENDPOINT" = "AWS_ENDPOINT", "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY", "AWS_SECRET_KEY"="AWS_SECRET_KEY", "AWS_REGION" = "AWS_REGION" - ) + ) ``` 完整示例如下 @@ -76,4 +76,19 @@ under the License. ( "timeout" = "3600" ); -``` \ No newline at end of file +``` + +## 常见问题 + +S3 SDK 默认使用 virtual-hosted style 方式。但某些对象存储系统可能没开启或没支持 virtual-hosted style 方式的访问,此时我们可以添加 `use_path_style` 参数来强制使用 path style 方式: + +``` + WITH S3 + ( + "AWS_ENDPOINT" = "AWS_ENDPOINT", + "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY", + "AWS_SECRET_KEY"="AWS_SECRET_KEY", + "AWS_REGION" = "AWS_REGION", + "use_path_style" = "true" + ) +``` diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 1ebbba5..e40f29a 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -514,7 +514,7 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> - <version>2.7.3</version> + <version>2.8.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java index 10fb1d4..d5b98fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java @@ -134,5 +134,4 @@ public abstract class BlobStorage implements Writable { Text.writeString(out, entry.getValue()); } } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index 3a2623d..df8230c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -29,16 +29,16 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.system.Backend; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + import java.io.DataInput; import java.io.DataOutput; import java.io.File; @@ -315,7 +315,9 @@ public class Repository implements Writable { // Check if this repo is available. // If failed to connect this repo, set errMsg and return false. public boolean ping() { - String path = location + "/" + joinPrefix(PREFIX_REPO, name); + // for s3 sdk, the headObject() method does not support list "dir", + // so we check FILE_REPO_INFO instead. + String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + FILE_REPO_INFO; try { URI checkUri = new URI(path); Status st = storage.checkPathExist(checkUri.normalize().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java index 2032229..9a2f29b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java @@ -71,9 +71,19 @@ public class S3Storage extends BlobStorage { public static final String S3_SK = "AWS_SECRET_KEY"; public static final String S3_ENDPOINT = "AWS_ENDPOINT"; public static final String S3_REGION = "AWS_REGION"; + public static final String USE_PATH_STYLE = "use_path_style"; + private static final Logger LOG = LogManager.getLogger(S3Storage.class); private final CaseInsensitiveMap caseInsensitiveProperties; private S3Client client; + // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg: + // endpoint: http://s3.us-east-2.amazonaws.com + // bucket/path: my_bucket/file.txt + // auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt + // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks: + // endpoint: http://cos.ap-beijing.myqcloud.com + // bucket/path: my_bucket/file.txt + // convert manually: See S3URI() private boolean forceHostedStyle = false; public S3Storage(Map<String, String> properties) { @@ -89,21 +99,31 @@ public class S3Storage extends BlobStorage { super.setProperties(properties); caseInsensitiveProperties.putAll(properties); // Virtual hosted-sytle is recommended in the s3 protocol. - // The path-style has been abandoned, but for some unexplainable reasons. - // The s3 client will determine whether the endpiont starts with `s3` + // The path-style has been abandoned, but for some unexplainable reasons, + // the s3 client will determine whether the endpiont starts with `s3` // when generating a virtual hosted-sytle request. // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763), // but the endpoints of many cloud service providers for object storage do not start with s3, // so they cannot be converted to virtual hosted-sytle. - // Some of them, such as aliyun's oss, only support virtual hosted-sytle, - // so we need to do some additional conversion. - + // Some of them, such as aliyun's oss, only support virtual hosted-sytle, and some of them(ceph) may only support + // path-style, so we need to do some additional conversion. + // + // use_path_style | !use_path_style + // S3 forceHostedStyle=false | forceHostedStyle=false + // !S3 forceHostedStyle=false | forceHostedStyle=true + // + // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use + // virtual hosted-sytle. + // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle. if (!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3")) { - forceHostedStyle = true; + if (caseInsensitiveProperties.getOrDefault(USE_PATH_STYLE, "false").toString().equalsIgnoreCase("true")) { + forceHostedStyle = false; + } else { + forceHostedStyle = true; + } } else { forceHostedStyle = false; } - } public static void checkS3(CaseInsensitiveMap caseInsensitiveProperties) throws UserException { @@ -168,7 +188,6 @@ public class S3Storage extends BlobStorage { @Override public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { long start = System.currentTimeMillis(); - S3URI uri = new S3URI(remoteFilePath, forceHostedStyle); // Write the data to a local file File localFile = new File(localFilePath); if (localFile.exists()) { @@ -183,6 +202,7 @@ public class S3Storage extends BlobStorage { } } try { + S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle); GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject(GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); if (localFile.length() == fileSize) { LOG.info( @@ -200,7 +220,7 @@ public class S3Storage extends BlobStorage { Status.ErrCode.COMMON_ERROR, "get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage()); } catch (UserException ue) { - LOG.error("connect to s3 failed: ", ue); + LOG.warn("connect to s3 failed: ", ue); return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); } catch (Exception e) { return new Status(Status.ErrCode.COMMON_ERROR, e.toString()); @@ -209,8 +229,8 @@ public class S3Storage extends BlobStorage { @Override public Status directUpload(String content, String remoteFile) { - S3URI uri = new S3URI(remoteFile, forceHostedStyle); try { + S3URI uri = S3URI.create(remoteFile, forceHostedStyle); PutObjectResponse response = getClient(uri.getVirtualBucket()) .putObject( @@ -228,9 +248,9 @@ public class S3Storage extends BlobStorage { } public Status copy(String origFilePath, String destFilePath) { - S3URI origUri = new S3URI(origFilePath); - S3URI descUri = new S3URI(destFilePath, forceHostedStyle); try { + S3URI origUri = S3URI.create(origFilePath); + S3URI descUri = S3URI.create(destFilePath, forceHostedStyle); getClient(descUri.getVirtualBucket()) .copyObject( CopyObjectRequest.builder() @@ -250,8 +270,8 @@ public class S3Storage extends BlobStorage { @Override public Status upload(String localPath, String remotePath) { - S3URI uri = new S3URI(remotePath, forceHostedStyle); try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); PutObjectResponse response = getClient(uri.getVirtualBucket()) .putObject( @@ -280,8 +300,8 @@ public class S3Storage extends BlobStorage { @Override public Status delete(String remotePath) { - S3URI uri = new S3URI(remotePath, forceHostedStyle); try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); DeleteObjectResponse response = getClient(uri.getVirtualBucket()) .deleteObject( @@ -319,6 +339,9 @@ public class S3Storage extends BlobStorage { conf.set("fs.s3a.endpoint", s3Endpoint); conf.set("fs.s3a.impl.disable.cache", "true"); conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + // introducing in hadoop aws 2.8.0 + conf.set("fs.s3a.path.style.access", forceHostedStyle ? "false" : "true"); + conf.set("fs.s3a.attempts.maximum", "2"); FileSystem s3AFileSystem = FileSystem.get(new URI(remotePath), conf); org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath); FileStatus[] files = s3AFileSystem.globStatus(pathPattern); @@ -344,8 +367,8 @@ public class S3Storage extends BlobStorage { if (!remotePath.endsWith("/")) { remotePath += "/"; } - S3URI uri = new S3URI(remotePath, forceHostedStyle); try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); PutObjectResponse response = getClient(uri.getVirtualBucket()) .putObject( @@ -364,8 +387,8 @@ public class S3Storage extends BlobStorage { @Override public Status checkPathExist(String remotePath) { - S3URI uri = new S3URI(remotePath, forceHostedStyle); try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); getClient(uri.getVirtualBucket()) .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); return Status.OK; @@ -373,11 +396,11 @@ public class S3Storage extends BlobStorage { if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); } else { - LOG.error("headObject failed:", e); + LOG.warn("headObject failed:", e); return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage()); } } catch (UserException ue) { - LOG.error("connect to s3 failed: ", ue); + LOG.warn("connect to s3 failed: ", ue); return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 5295f96..d823a21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -147,7 +147,7 @@ public class BrokerUtil { fileStatuses.add(tBrokerFileStatus); } } catch (TException e) { - LOG.warn("Broker list path exception, path={}, address={}, exception={}", path, address, e); + LOG.warn("Broker list path exception, path={}, address={}", path, address, e); throw new UserException("Broker list path exception. path=" + path + ", broker=" + address); } finally { returnClient(client, address, failed); @@ -155,10 +155,15 @@ public class BrokerUtil { } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) { S3Storage s3 = new S3Storage(brokerDesc.getProperties()); List<RemoteFile> rfiles = new ArrayList<>(); - Status st = s3.list(path, rfiles, false); - if (!st.ok()) { - throw new UserException("S3 list path failed. path=" + path - + ",msg=" + st.getErrMsg()); + try { + Status st = s3.list(path, rfiles, false); + if (!st.ok()) { + throw new UserException("S3 list path failed. path=" + path + + ",msg=" + st.getErrMsg()); + } + } catch (Exception e) { + LOG.warn("s3 list path exception, path={}", path, e); + throw new UserException("s3 list path exception. path=" + path + ", err: " + e.getMessage()); } for (RemoteFile r : rfiles) { if (r.isFile()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java index 0aadfe4..1cf1b80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java @@ -17,11 +17,15 @@ package org.apache.doris.common.util; -import com.google.common.base.Preconditions; +import org.apache.doris.common.UserException; + import com.google.common.collect.ImmutableSet; +import org.apache.parquet.Strings; import org.apache.parquet.glob.GlobExpander; +import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import java.util.Set; @@ -44,7 +48,7 @@ public class S3URI { private final String virtualBucket; private final String bucket; private final String key; - private boolean forceHosted; + private boolean forceVirtualHosted; /** * Creates a new S3URI based on the bucket and key parsed from the location as defined in: @@ -55,27 +59,60 @@ public class S3URI { * * @param location fully qualified URI */ - public S3URI(String location) { - this(location, false); + + public static S3URI create(String location) throws UserException { + return create(location, false); + } + + public static S3URI create(String location, boolean forceVirtualHosted) throws UserException { + S3URI s3URI = new S3URI(location, forceVirtualHosted); + return s3URI; } - public S3URI(String location, boolean forceHosted) { - Preconditions.checkNotNull(location, "Location cannot be null."); - this.location = location; - this.forceHosted = forceHosted; - String[] schemeSplit = location.split(SCHEME_DELIM); - Preconditions.checkState(schemeSplit.length == 2, "Invalid S3 URI: %s", location); + private S3URI(String location, boolean forceVirtualHosted) throws UserException { + if (Strings.isNullOrEmpty(location)) { + throw new UserException("s3 location can not be null"); + } + + try { + // the location need to be normalized to eliminate double "/", or the hadoop aws api + // won't handle it correctly. + this.location = new URI(location).normalize().toString(); + } catch (URISyntaxException e) { + throw new UserException("Invalid s3 uri: " + e.getMessage()); + } + + this.forceVirtualHosted = forceVirtualHosted; + String[] schemeSplit = this.location.split(SCHEME_DELIM); + if (schemeSplit.length != 2) { + throw new UserException("Invalid s3 uri: " + this.location); + } this.scheme = schemeSplit[0]; - Preconditions.checkState(VALID_SCHEMES.contains(scheme.toLowerCase()), "Invalid scheme: %s", scheme); + if (!VALID_SCHEMES.contains(scheme.toLowerCase())) { + throw new UserException("Invalid scheme: " + this.location); + } + String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); - Preconditions.checkState(authoritySplit.length == 2, "Invalid S3 URI: %s", location); - Preconditions.checkState(!authoritySplit[1].trim().isEmpty(), "Invalid S3 key: %s", location); + if (authoritySplit.length != 2) { + throw new UserException("Invalid s3 uri: " + this.location); + } + if (authoritySplit[1].trim().isEmpty()) { + throw new UserException("Invalid s3 key: " + this.location); + } + // Strip query and fragment if they exist String path = authoritySplit[1]; path = path.split(QUERY_DELIM)[0]; path = path.split(FRAGMENT_DELIM)[0]; - if (forceHosted) { + if (this.forceVirtualHosted) { + // If forceVirtualHosted is true, the s3 client will NOT automatically convert to virtual-hosted style. + // So we do some convert manually. Eg: + // endpoint: http://cos.ap-beijing.myqcloud.com + // bucket/path: my_bucket/file.txt + // `virtualBucket` will be "my_bucket" + // `bucket` will be `file.txt` + // So that when assembling the real endpoint will be: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt this.virtualBucket = authoritySplit[0]; String[] paths = path.split("/", 2); this.bucket = paths[0]; @@ -85,6 +122,9 @@ public class S3URI { key = ""; } } else { + // If forceVirtualHosted is false, let the s3 client to determine how to covert endpoint, eg: + // For s3 endpoint(start with "s3."), it will convert to virtual-hosted style. + // For others, keep as it is (maybe path-style, maybe virtual-hosted style.) this.virtualBucket = ""; this.bucket = authoritySplit[0]; key = path; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java index fa05289..bd6ec87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -17,8 +17,6 @@ package org.apache.doris.httpv2.rest.manager; -import lombok.Getter; -import lombok.Setter; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -48,7 +46,6 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; -import org.apache.commons.httpclient.HttpException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.web.bind.annotation.RequestBody; @@ -70,6 +67,9 @@ import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import lombok.Getter; +import lombok.Setter; + /* * Used to return all node information, configuration information and modify node config. */ @@ -540,10 +540,10 @@ public class NodeAction extends RestBaseController { } private void parseFeSetConfigResponse(String response, Pair<String, Integer> hostPort, - List<Map<String, String>> failedTotal) throws HttpException { + List<Map<String, String>> failedTotal) throws Exception { JsonObject jsonObject = JsonParser.parseString(response).getAsJsonObject(); if (jsonObject.get("code").getAsInt() != HttpUtils.REQUEST_SUCCESS_CODE) { - throw new HttpException(jsonObject.get("msg").getAsString()); + throw new Exception(jsonObject.get("msg").getAsString()); } SetConfigAction.SetConfigEntity setConfigEntity = GsonUtils.GSON.fromJson(jsonObject.get("data").getAsJsonObject(), SetConfigAction.SetConfigEntity.class); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java index f52d3ef..be74035 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java @@ -39,7 +39,7 @@ import java.util.UUID; @Ignore public class S3StorageTest { private static String basePath; - private final String bucket = "s3://yang-repo/"; + private final String bucket = "s3://doris-test/"; private Map<String, String> properties; private S3Storage storage; private String testFile; @@ -56,6 +56,8 @@ public class S3StorageTest { properties.put("AWS_ACCESS_KEY", System.getenv().getOrDefault("AWS_AK", "")); properties.put("AWS_SECRET_KEY", System.getenv().getOrDefault("AWS_SK", "")); properties.put("AWS_ENDPOINT", "http://s3.bj.bcebos.com"); + properties.put(S3Storage.USE_PATH_STYLE, "false"); + properties.put("AWS_REGION", "bj"); storage = new S3Storage(properties); testFile = bucket + basePath + "/Ode_to_the_West_Wind"; @@ -123,7 +125,6 @@ public class S3StorageTest { storage.rename(testFile + ".bak", testFile + ".bak1"); Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(testFile + ".bak").getErrCode()); Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak1")); - } @Test @@ -133,17 +134,16 @@ public class S3StorageTest { Assert.assertEquals(Status.OK, storage.delete(deleteFile)); Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(deleteFile).getErrCode()); Assert.assertEquals(Status.OK, storage.delete(deleteFile + "xxxx")); - } @Test public void list() { List<RemoteFile> result = new ArrayList<>(); - String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind"; + String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind"; Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".1")); Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".2")); Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".3")); - Assert.assertEquals(Status.OK, storage.list(bucket + basePath + "_list/*", result)); + Assert.assertEquals(Status.OK, storage.list(bucket + basePath + "_list/*", result)); Assert.assertEquals(3, result.size()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java index f310c5b..52ab836 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java @@ -17,24 +17,26 @@ package org.apache.doris.common.util; -import org.junit.Test; +import org.apache.doris.common.UserException; import org.junit.Assert; +import org.junit.Test; public class S3URITest { @Test - public void testLocationParsing() { + public void testLocationParsing() throws UserException { String p1 = "s3://bucket/path/to/file"; - S3URI uri1 = new S3URI(p1); + S3URI uri1 = S3URI.create(p1); Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path/to/file", uri1.getKey()); Assert.assertEquals(p1, uri1.toString()); } + @Test - public void testPathLocationParsing() { + public void testPathLocationParsing() throws UserException { String p1 = "s3://bucket/path/"; - S3URI uri1 = new S3URI(p1); + S3URI uri1 = S3URI.create(p1); Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path/", uri1.getKey()); @@ -42,34 +44,34 @@ public class S3URITest { } @Test - public void testEncodedString() { + public void testEncodedString() throws UserException { String p1 = "s3://bucket/path%20to%20file"; - S3URI uri1 = new S3URI(p1); + S3URI uri1 = S3URI.create(p1); Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path%20to%20file", uri1.getKey()); Assert.assertEquals(p1, uri1.toString()); } - @Test(expected = IllegalStateException.class) - public void missingKey() { - new S3URI("https://bucket/"); + @Test(expected = UserException.class) + public void missingKey() throws UserException { + S3URI.create("https://bucket/"); } - @Test(expected = IllegalStateException.class) - public void relativePathing() { - new S3URI("/path/to/file"); + @Test(expected = UserException.class) + public void relativePathing() throws UserException { + S3URI.create("/path/to/file"); } - @Test(expected = IllegalStateException.class) - public void invalidScheme() { - new S3URI("ftp://bucket/"); + @Test(expected = UserException.class) + public void invalidScheme() throws UserException { + S3URI.create("ftp://bucket/"); } @Test - public void testQueryAndFragment() { + public void testQueryAndFragment() throws UserException { String p1 = "s3://bucket/path/to/file?query=foo#bar"; - S3URI uri1 = new S3URI(p1); + S3URI uri1 = S3URI.create(p1); Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path/to/file", uri1.getKey()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index 3a21275..e5f2fc5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -20,9 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CastExpr; import org.apache.doris.analysis.DescriptorTable; -import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; -import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.AggregateType; @@ -35,13 +33,9 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarFunction; import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Table.TableType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; -import org.apache.doris.load.Load; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExplainLevel; @@ -502,6 +496,30 @@ public class StreamLoadScanNodeTest { } } + new Expectations() {{ + dstTable.getBaseSchema(); + minTimes = 0; + result = columns; + dstTable.getBaseSchema(anyBoolean); + minTimes = 0; + result = columns; + dstTable.getFullSchema(); + minTimes = 0; + result = columns; + dstTable.getColumn("k1"); + minTimes = 0; + result = columns.get(0); + dstTable.getColumn("k2"); + minTimes = 0; + result = columns.get(1); + dstTable.getColumn("v1"); + minTimes = 0; + result = columns.get(2); + dstTable.getColumn("v2"); + minTimes = 0; + result = columns.get(3); + }}; + TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k3"); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); @@ -629,6 +647,30 @@ public class StreamLoadScanNodeTest { } } + new Expectations() {{ + dstTable.getBaseSchema(); + minTimes = 0; + result = columns; + dstTable.getBaseSchema(anyBoolean); + minTimes = 0; + result = columns; + dstTable.getFullSchema(); + minTimes = 0; + result = columns; + dstTable.getColumn("k1"); + minTimes = 0; + result = columns.get(0); + dstTable.getColumn("k2"); + minTimes = 0; + result = columns.get(1); + dstTable.getColumn("v1"); + minTimes = 0; + result = columns.get(2); + dstTable.getColumn("v2"); + minTimes = 0; + result = columns.get(3); + }}; + TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k1"); request.setWhere("k5 = 1"); @@ -659,6 +701,30 @@ public class StreamLoadScanNodeTest { } } + new Expectations() {{ + dstTable.getBaseSchema(); + minTimes = 0; + result = columns; + dstTable.getBaseSchema(anyBoolean); + minTimes = 0; + result = columns; + dstTable.getFullSchema(); + minTimes = 0; + result = columns; + dstTable.getColumn("k1"); + minTimes = 0; + result = columns.get(0); + dstTable.getColumn("k2"); + minTimes = 0; + result = columns.get(1); + dstTable.getColumn("v1"); + minTimes = 0; + result = columns.get(2); + dstTable.getColumn("v2"); + minTimes = 0; + result = columns.get(3); + }}; + TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k1"); request.setWhere("k1 + v2"); diff --git a/fe/pom.xml b/fe/pom.xml index bf8b049..453a444 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -590,7 +590,7 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>2.7.3</version> + <version>2.8.0</version> <scope>provided</scope> <exclusions> <exclusion> diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 5ecbb69..2e07c46 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -94,6 +94,8 @@ message PTabletWriterAddBatchRequest { // only valid when eos is true // valid partition ids that would write in this writer repeated int64 partition_ids = 8; + // the backend which send this request + optional int64 backend_id = 9 [default = -1]; }; message PTabletWriterAddBatchResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org