This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5280e277e71 [chore](be) Acquire and check MD5 digest of the file to
download (#37418)
5280e277e71 is described below
commit 5280e277e71c0fb3b5fe12639a7201ee5e205bb4
Author: walter <[email protected]>
AuthorDate: Mon Jul 8 18:55:35 2024 +0800
[chore](be) Acquire and check MD5 digest of the file to download (#37418)
Cherry-pick #35807, #36621, #36726
---
be/src/http/action/download_action.cpp | 21 +++---
be/src/http/action/download_binlog_action.cpp | 9 ++-
be/src/http/http_client.cpp | 46 ++++++++++++-
be/src/http/http_client.h | 5 +-
be/src/http/utils.cpp | 37 +++++++----
be/src/http/utils.h | 3 +-
be/src/runtime/snapshot_loader.cpp | 89 +++++++++++++++++--------
be/src/service/backend_service.cpp | 56 ++++++++++++++--
be/test/http/http_client_test.cpp | 96 +++++++++++++++++++++++++++
9 files changed, 299 insertions(+), 63 deletions(-)
diff --git a/be/src/http/action/download_action.cpp
b/be/src/http/action/download_action.cpp
index f271b4f1916..284314f421d 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -17,9 +17,7 @@
#include "http/action/download_action.h"
-#include <algorithm>
#include <memory>
-#include <sstream>
#include <string>
#include <utility>
@@ -34,10 +32,11 @@
namespace doris {
namespace {
-static const std::string FILE_PARAMETER = "file";
-static const std::string TOKEN_PARAMETER = "token";
-static const std::string CHANNEL_PARAMETER = "channel";
-static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+const std::string FILE_PARAMETER = "file";
+const std::string TOKEN_PARAMETER = "token";
+const std::string CHANNEL_PARAMETER = "channel";
+const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
} // namespace
DownloadAction::DownloadAction(ExecEnv* exec_env,
@@ -47,7 +46,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env,
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
- for (auto& dir : allow_dirs) {
+ for (const auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
if (!st.ok()) {
@@ -116,11 +115,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const
std::string& file_par
} else {
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
- if (ingest_binlog) {
- do_file_response(file_param, req, _rate_limit_group.get());
- } else {
- do_file_response(file_param, req);
- }
+ bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
+ auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() :
nullptr;
+ do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
}
}
diff --git a/be/src/http/action/download_binlog_action.cpp
b/be/src/http/action/download_binlog_action.cpp
index dbe2880d3b4..61d65ca9756 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
const std::string kSegmentIndexIdParameter = "segment_index_id";
+const std::string kAcquireMD5Parameter = "acquire_md5";
// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
@@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) {
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group*
rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
+ bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req,
kSegmentIndexParameter);
segment_file_path = tablet->get_segment_filepath(rowset_id,
segment_index);
+ is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
@@ -128,7 +131,7 @@ void handle_get_segment_file(HttpRequest* req,
bufferevent_rate_limit_group* rat
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
- do_file_response(segment_file_path, req, rate_limit_group);
+ do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
}
/// handle get segment index file, need tablet_id, rowset_id, segment_index &&
segment_index_id
@@ -136,6 +139,7 @@ void handle_get_segment_index_file(HttpRequest* req,
bufferevent_rate_limit_group*
rate_limit_group) {
// Step 1: get download file path
std::string segment_index_file_path;
+ bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
@@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req,
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index,
segment_index_id);
+ is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
@@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req,
LOG(WARNING) << "file not exist, file path: " <<
segment_index_file_path;
return;
}
- do_file_response(segment_index_file_path, req, rate_limit_group);
+ do_file_response(segment_index_file_path, req, rate_limit_group,
is_acquire_md5);
}
void handle_get_rowset_meta(HttpRequest* req) {
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index d7a6c9c9665..b8ef9834341 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -24,12 +24,36 @@
#include <ostream>
#include "common/config.h"
+#include "http/http_headers.h"
#include "http/http_status.h"
#include "util/stack_util.h"
namespace doris {
-HttpClient::HttpClient() {}
+static const char* header_error_msg(CURLHcode code) {
+ switch (code) {
+ case CURLHE_OK:
+ return "OK";
+ case CURLHE_BADINDEX:
+ return "header exists but not with this index ";
+ case CURLHE_MISSING:
+ return "no such header exists";
+ case CURLHE_NOHEADERS:
+ return "no headers at all exist (yet)";
+ case CURLHE_NOREQUEST:
+ return "no request with this number was used";
+ case CURLHE_OUT_OF_MEMORY:
+ return "out of memory while processing";
+ case CURLHE_BAD_ARGUMENT:
+ return "a function argument was not okay";
+ case CURLHE_NOT_BUILT_IN:
+ return "curl_easy_header() was disabled in the build";
+ default:
+ return "unknown";
+ }
+}
+
+HttpClient::HttpClient() = default;
HttpClient::~HttpClient() {
if (_curl != nullptr) {
@@ -88,7 +112,7 @@ Status HttpClient::init(const std::string& url) {
}
curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb,
void* param) {
- HttpClient* client = (HttpClient*)param;
+ auto* client = (HttpClient*)param;
return client->on_response_data(buffer, size * nmemb);
};
@@ -177,6 +201,24 @@ Status HttpClient::execute(const std::function<bool(const
void* data, size_t len
return Status::OK();
}
+Status HttpClient::get_content_md5(std::string* md5) const {
+ struct curl_header* header_ptr;
+ auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0,
CURLH_HEADER, 0, &header_ptr);
+ if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) {
+ // no such headers exists
+ md5->clear();
+ return Status::OK();
+ } else if (code != CURLHE_OK) {
+ auto msg = fmt::format("failed to get http header {}: {} ({})",
HttpHeaders::CONTENT_MD5,
+ header_error_msg(code), code);
+ LOG(WARNING) << msg << ", trace=" << get_stack_trace();
+ return Status::HttpError(std::move(msg));
+ }
+
+ *md5 = header_ptr->value;
+ return Status::OK();
+}
+
Status HttpClient::download(const std::string& local_path) {
// set method to GET
set_method(GET);
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index d80f484ce80..e379895a73e 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -106,7 +106,7 @@ public:
if (cl < 0) {
return Status::InternalError(
fmt::format("failed to get content length, it should
be a positive value, "
- "actrual is : {}",
+ "actual is : {}",
cl));
}
*length = (uint64_t)cl;
@@ -115,6 +115,9 @@ public:
return Status::InternalError("failed to get content length. err code:
{}", code);
}
+ // Get the value of the header CONTENT-MD5. The output is empty if no such
header exists.
+ Status get_content_md5(std::string* md5) const;
+
long get_http_status() const {
long code;
curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &code);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 49f9d2c4993..b03017c12a7 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -22,8 +22,6 @@
#include <sys/stat.h>
#include <unistd.h>
-#include <algorithm>
-#include <memory>
#include <ostream>
#include <vector>
@@ -41,6 +39,7 @@
#include "io/fs/local_file_system.h"
#include "olap/wal/wal_manager.h"
#include "runtime/exec_env.h"
+#include "util/md5.h"
#include "util/path_util.h"
#include "util/url_coding.h"
@@ -56,7 +55,7 @@ std::string encode_basic_auth(const std::string& user, const
std::string& passwd
bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string*
passwd) {
const char k_basic[] = "Basic ";
- auto& auth = req.header(HttpHeaders::AUTHORIZATION);
+ const auto& auth = req.header(HttpHeaders::AUTHORIZATION);
if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) !=
0) {
return false;
}
@@ -76,8 +75,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string*
user, std::string* pa
}
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
- auto& token = req.header("token");
- auto& auth_code = req.header(HTTP_AUTH_CODE);
+ const auto& token = req.header("token");
+ const auto& auth_code = req.header(HTTP_AUTH_CODE);
if (!token.empty()) {
auth->token = token;
} else if (!auth_code.empty()) {
@@ -111,25 +110,24 @@ std::string get_content_type(const std::string&
file_name) {
std::string file_ext = path_util::file_extension(file_name);
VLOG_TRACE << "file_name: " << file_name << "; file extension: [" <<
file_ext << "]";
if (file_ext == std::string(".html") || file_ext == std::string(".htm")) {
- return std::string("text/html; charset=utf-8");
+ return "text/html; charset=utf-8";
} else if (file_ext == std::string(".js")) {
- return std::string("application/javascript; charset=utf-8");
+ return "application/javascript; charset=utf-8";
} else if (file_ext == std::string(".css")) {
- return std::string("text/css; charset=utf-8");
+ return "text/css; charset=utf-8";
} else if (file_ext == std::string(".txt")) {
- return std::string("text/plain; charset=utf-8");
+ return "text/plain; charset=utf-8";
} else if (file_ext == std::string(".png")) {
- return std::string("image/png");
+ return "image/png";
} else if (file_ext == std::string(".ico")) {
- return std::string("image/x-icon");
+ return "image/x-icon";
} else {
return "text/plain; charset=utf-8";
}
- return "";
}
void do_file_response(const std::string& file_path, HttpRequest* req,
- bufferevent_rate_limit_group* rate_limit_group) {
+ bufferevent_rate_limit_group* rate_limit_group, bool
is_acquire_md5) {
if (file_path.find("..") != std::string::npos) {
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
@@ -163,6 +161,17 @@ void do_file_response(const std::string& file_path,
HttpRequest* req,
req->add_output_header(HttpHeaders::CONTENT_TYPE,
get_content_type(file_path).c_str());
+ if (is_acquire_md5) {
+ Md5Digest md5;
+
+ void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
+ md5.update(buf, file_size);
+ md5.digest();
+ munmap(buf, file_size);
+
+ req->add_output_header(HttpHeaders::CONTENT_MD5, md5.hex().c_str());
+ }
+
if (req->method() == HttpMethod::HEAD) {
close(fd);
req->add_output_header(HttpHeaders::CONTENT_LENGTH,
std::to_string(file_size).c_str());
@@ -194,7 +203,7 @@ void do_dir_response(const std::string& dir_path,
HttpRequest* req) {
}
bool load_size_smaller_than_wal_limit(int64_t content_length) {
- // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload
content length. If it is empty or equels to 0, it means this streamload
+ // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload
content length. If it is empty or equals to 0, it means this streamload
// is a chunked streamload and we are not sure its size.
// 2. if streamload content length is too large, like larger than 80% of
the WAL constrain.
//
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 254d59cf13d..20be6c0fcd7 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -37,7 +37,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string*
user, std::string* pa
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
void do_file_response(const std::string& dir_path, HttpRequest* req,
- bufferevent_rate_limit_group* rate_limit_group =
nullptr);
+ bufferevent_rate_limit_group* rate_limit_group = nullptr,
+ bool is_acquire_md5 = false);
void do_dir_response(const std::string& dir_path, HttpRequest* req);
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index cab8edb1927..a5061c4decf 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -417,8 +417,8 @@ Status SnapshotLoader::remote_http_download(
// Step before, validate all remote
// Step 1: Validate local tablet snapshot paths
- for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
- auto& path = remote_tablet_snapshot.local_snapshot_path;
+ for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ const auto& path = remote_tablet_snapshot.local_snapshot_path;
bool res = true;
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path,
&res));
if (!res) {
@@ -433,10 +433,10 @@ Status SnapshotLoader::remote_http_download(
// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
- // TODO(Drogon): add md5sum
+ std::string md5;
};
std::unordered_map<std::string, std::unordered_map<std::string,
LocalFileStat>> local_files_map;
- for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
std::vector<std::string> local_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path,
&local_files));
@@ -452,7 +452,14 @@ Status SnapshotLoader::remote_http_download(
return Status::IOError("can't retrive file_size of {}, due to
{}", local_file_path,
ec.message());
}
- local_filestat[local_file] = {local_file_size};
+ std::string md5;
+ auto status =
io::global_local_filesystem()->md5sum(local_file_path, &md5);
+ if (!status.ok()) {
+ LOG(WARNING) << "download file error, local file " <<
local_file_path
+ << " md5sum: " << status.to_string();
+ return status;
+ }
+ local_filestat[local_file] = {local_file_size, md5};
}
}
@@ -465,22 +472,22 @@ Status SnapshotLoader::remote_http_download(
int total_num = remote_tablet_snapshots.size();
int finished_num = 0;
struct RemoteFileStat {
- // TODO(Drogon): Add md5sum
std::string url;
+ std::string md5;
uint64_t size;
};
std::unordered_map<std::string, std::unordered_map<std::string,
RemoteFileStat>>
remote_files_map;
- for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
const auto& token = remote_tablet_snapshot.remote_token;
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
// HEAD
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
- std::string remote_url_prefix =
-
fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
- remote_be_addr.hostname, remote_be_addr.port,
token, remote_path);
+ std::string base_url =
fmt::format("http://{}:{}/api/_tablet/_download?token={}",
+ remote_be_addr.hostname,
remote_be_addr.port, token);
+ std::string remote_url_prefix = fmt::format("{}&file={}", base_url,
remote_path);
string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient*
client) {
@@ -493,30 +500,31 @@ Status SnapshotLoader::remote_http_download(
strings::Split(file_list_str, "\n", strings::SkipWhitespace());
for (const auto& filename : filename_list) {
- std::string remote_file_url = fmt::format(
-
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog",
- remote_tablet_snapshot.remote_be_addr.hostname,
- remote_tablet_snapshot.remote_be_addr.port,
remote_tablet_snapshot.remote_token,
- remote_tablet_snapshot.remote_snapshot_path, filename);
+ std::string remote_file_url =
+ fmt::format("{}&file={}/{}&channel=ingest_binlog",
base_url,
+ remote_tablet_snapshot.remote_snapshot_path,
filename);
// get file length
uint64_t file_size = 0;
- auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient*
client) {
- RETURN_IF_ERROR(client->init(remote_file_url));
+ std::string file_md5;
+ auto get_file_stat_cb = [&remote_file_url, &file_size,
&file_md5](HttpClient* client) {
+ std::string url = fmt::format("{}&acquire_md5=true",
remote_file_url);
+ RETURN_IF_ERROR(client->init(url));
client->set_timeout_ms(kGetLengthTimeout * 1000);
RETURN_IF_ERROR(client->head());
RETURN_IF_ERROR(client->get_content_length(&file_size));
+ RETURN_IF_ERROR(client->get_content_md5(&file_md5));
return Status::OK();
};
RETURN_IF_ERROR(
- HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
get_file_size_cb));
+ HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
get_file_stat_cb));
- remote_files[filename] = RemoteFileStat {remote_file_url,
file_size};
+ remote_files[filename] = RemoteFileStat {remote_file_url,
file_md5, file_size};
}
}
// Step 4: Compare local and remote files && get all need download files
- for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
TTaskType::type::DOWNLOAD));
@@ -529,8 +537,8 @@ Status SnapshotLoader::remote_http_download(
// get all need download files
std::vector<std::string> need_download_files;
for (const auto& [remote_file, remote_filestat] : remote_files) {
- LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
- remote_filestat.size);
+ LOG(INFO) << "remote file: " << remote_file << ", size: " <<
remote_filestat.size
+ << ", md5: " << remote_filestat.md5;
auto it = local_files.find(remote_file);
if (it == local_files.end()) {
need_download_files.emplace_back(remote_file);
@@ -545,7 +553,11 @@ Status SnapshotLoader::remote_http_download(
need_download_files.emplace_back(remote_file);
continue;
}
- // TODO(Drogon): check by md5sum, if not match then download
+
+ if (auto& local_filestat = it->second; local_filestat.md5 !=
remote_filestat.md5) {
+ need_download_files.emplace_back(remote_file);
+ continue;
+ }
LOG(INFO) << fmt::format("file {} already exists, skip download",
remote_file);
}
@@ -569,6 +581,7 @@ Status SnapshotLoader::remote_http_download(
auto& remote_filestat = remote_files[filename];
auto file_size = remote_filestat.size;
auto& remote_file_url = remote_filestat.url;
+ auto& remote_file_md5 = remote_filestat.md5;
// check disk capacity
if (data_dir->reach_capacity_limit(file_size)) {
@@ -591,8 +604,8 @@ Status SnapshotLoader::remote_http_download(
<< " to: " << local_file_path << ". size(B): " <<
file_size
<< ", timeout(s): " << estimate_timeout;
- auto download_cb = [&remote_file_url, estimate_timeout,
&local_file_path,
- file_size](HttpClient* client) {
+ auto download_cb = [&remote_file_url, &remote_file_md5,
estimate_timeout,
+ &local_file_path, file_size](HttpClient*
client) {
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
RETURN_IF_ERROR(client->download(local_file_path));
@@ -612,13 +625,35 @@ Status SnapshotLoader::remote_http_download(
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not
equal");
}
+
+ if (!remote_file_md5.empty()) { // keep compatibility
+ std::string local_file_md5;
+
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
+
&local_file_md5));
+ if (local_file_md5 != remote_file_md5) {
+ LOG(WARNING) << "download file md5 error"
+ << ", remote_file_url=" << remote_file_url
+ << ", local_file_path=" << local_file_path
+ << ", remote_file_md5=" << remote_file_md5
+ << ", local_file_md5=" << local_file_md5;
+ return Status::RuntimeError(
+ "download file {} md5 is not equal, local={},
remote={}",
+ remote_file_url, local_file_md5,
remote_file_md5);
+ }
+ }
+
return io::global_local_filesystem()->permission(
local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
};
-
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
download_cb));
+ auto status =
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to download file from " <<
remote_file_url
+ << ", status: " << status.to_string();
+ return status;
+ }
// local_files always keep the updated local files
- local_files[filename] = LocalFileStat {file_size};
+ local_files[filename] = LocalFileStat {file_size, remote_file_md5};
}
uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index c4ccaa7281b..324c21d91ae 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -267,7 +267,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
// Step 5.3: get all segment files
for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
auto segment_file_size = segment_file_sizes[segment_index];
- auto get_segment_file_url = segment_file_urls[segment_index];
+ auto get_segment_file_url =
+ fmt::format("{}&acquire_md5=true",
segment_file_urls[segment_index]);
uint64_t estimate_timeout =
segment_file_size / config::download_low_speed_limit_kbps /
1024;
@@ -286,6 +287,12 @@ void _ingest_binlog(IngestBinlogArg* arg) {
RETURN_IF_ERROR(client->download(local_segment_path));
download_success_files.push_back(local_segment_path);
+ std::string remote_file_md5;
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ LOG(INFO) << "download segment file to " << local_segment_path
+ << ", remote md5: " << remote_file_md5
+ << ", remote size: " << segment_file_size;
+
std::error_code ec;
// Check file length
uint64_t local_file_size =
std::filesystem::file_size(local_segment_path, ec);
@@ -294,13 +301,32 @@ void _ingest_binlog(IngestBinlogArg* arg) {
return Status::IOError("can't retrive file_size of {}, due to
{}",
local_segment_path, ec.message());
}
+
if (local_file_size != segment_file_size) {
LOG(WARNING) << "download file length error"
<< ", get_segment_file_url=" <<
get_segment_file_url
<< ", file_size=" << segment_file_size
<< ", local_file_size=" << local_file_size;
- return Status::InternalError("downloaded file size is not
equal");
+ return Status::RuntimeError(
+ "downloaded file size is not equal, local={},
remote={}", local_file_size,
+ segment_file_size);
+ }
+
+ if (!remote_file_md5.empty()) { // keep compatibility
+ std::string local_file_md5;
+ RETURN_IF_ERROR(
+
io::global_local_filesystem()->md5sum(local_segment_path, &local_file_md5));
+ if (local_file_md5 != remote_file_md5) {
+ LOG(WARNING) << "download file md5 error"
+ << ", get_segment_file_url=" <<
get_segment_file_url
+ << ", remote_file_md5=" << remote_file_md5
+ << ", local_file_md5=" << local_file_md5;
+ return Status::RuntimeError(
+ "download file md5 is not equal, local={},
remote={}", local_file_md5,
+ remote_file_md5);
+ }
}
+
return
io::global_local_filesystem()->permission(local_segment_path,
io::LocalFileSystem::PERMS_OWNER_RW);
};
@@ -415,7 +441,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
auto segment_index_file_size = segment_index_file_sizes[i];
- auto get_segment_index_file_url = segment_index_file_urls[i];
+ auto get_segment_index_file_url =
+ fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);
uint64_t estimate_timeout =
segment_index_file_size /
config::download_low_speed_limit_kbps / 1024;
@@ -434,6 +461,9 @@ void _ingest_binlog(IngestBinlogArg* arg) {
RETURN_IF_ERROR(client->download(local_segment_index_path));
download_success_files.push_back(local_segment_index_path);
+ std::string remote_file_md5;
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+
std::error_code ec;
// Check file length
uint64_t local_index_file_size =
@@ -448,8 +478,26 @@ void _ingest_binlog(IngestBinlogArg* arg) {
<< ", get_segment_index_file_url=" <<
get_segment_index_file_url
<< ", index_file_size=" << segment_index_file_size
<< ", local_index_file_size=" <<
local_index_file_size;
- return Status::InternalError("downloaded index file size is
not equal");
+ return Status::RuntimeError(
+ "downloaded index file size is not equal, local={},
remote={}",
+ local_index_file_size, segment_index_file_size);
}
+
+ if (!remote_file_md5.empty()) { // keep compatibility
+ std::string local_file_md5;
+
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path,
+
&local_file_md5));
+ if (local_file_md5 != remote_file_md5) {
+ LOG(WARNING) << "download file md5 error"
+ << ", get_segment_index_file_url=" <<
get_segment_index_file_url
+ << ", remote_file_md5=" << remote_file_md5
+ << ", local_file_md5=" << local_file_md5;
+ return Status::RuntimeError(
+ "download file md5 is not equal, local={},
remote={}", local_file_md5,
+ remote_file_md5);
+ }
+ }
+
return
io::global_local_filesystem()->permission(local_segment_index_path,
io::LocalFileSystem::PERMS_OWNER_RW);
};
diff --git a/be/test/http/http_client_test.cpp
b/be/test/http/http_client_test.cpp
index 729a709fb93..c157f1a13c0 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -17,8 +17,11 @@
#include "http/http_client.h"
+#include <fcntl.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
#include <unistd.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -30,6 +33,7 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/utils.h"
+#include "util/md5.h"
namespace doris {
@@ -43,8 +47,15 @@ public:
return;
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain;
version=0.0.4");
+ bool is_acquire_md5 = !req->param("acquire_md5").empty();
if (req->method() == HttpMethod::HEAD) {
req->add_output_header(HttpHeaders::CONTENT_LENGTH,
std::to_string(5).c_str());
+ if (is_acquire_md5) {
+ Md5Digest md5;
+ md5.update("md5sum", 6);
+ md5.digest();
+ req->add_output_header(HttpHeaders::CONTENT_MD5,
md5.hex().c_str());
+ }
HttpChannel::send_reply(req);
} else {
std::string response = "test1";
@@ -80,6 +91,13 @@ public:
}
};
+class HttpDownloadFileHandler : public HttpHandler {
+public:
+ void handle(HttpRequest* req) override {
+ do_file_response("/proc/self/exe", req, nullptr, true);
+ }
+};
+
static EvHttpServer* s_server = nullptr;
static int real_port = 0;
static std::string hostname = "";
@@ -87,6 +105,7 @@ static std::string hostname = "";
static HttpClientTestSimpleGetHandler s_simple_get_handler;
static HttpClientTestSimplePostHandler s_simple_post_handler;
static HttpNotFoundHandler s_not_found_handler;
+static HttpDownloadFileHandler s_download_file_handler;
class HttpClientTest : public testing::Test {
public:
@@ -99,6 +118,7 @@ public:
s_server->register_handler(HEAD, "/simple_get", &s_simple_get_handler);
s_server->register_handler(POST, "/simple_post",
&s_simple_post_handler);
s_server->register_handler(GET, "/not_found", &s_not_found_handler);
+ s_server->register_handler(HEAD, "/download_file",
&s_download_file_handler);
static_cast<void>(s_server->start());
real_port = s_server->get_real_port();
EXPECT_NE(0, real_port);
@@ -203,4 +223,80 @@ TEST_F(HttpClientTest, not_found) {
EXPECT_FALSE(status.ok());
}
+TEST_F(HttpClientTest, header_content_md5) {
+ std::string url = hostname + "/simple_get";
+
+ {
+ // without md5
+ HttpClient client;
+ auto st = client.init(url);
+ EXPECT_TRUE(st.ok());
+ client.set_method(HEAD);
+ client.set_basic_auth("test1", "");
+ st = client.execute();
+ EXPECT_TRUE(st.ok());
+ uint64_t len = 0;
+ st = client.get_content_length(&len);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(5, len);
+ std::string md5;
+ st = client.get_content_md5(&md5);
+ EXPECT_TRUE(st.ok());
+ EXPECT_TRUE(md5.empty());
+ }
+
+ {
+ // with md5
+ HttpClient client;
+ auto st = client.init(url + "?acquire_md5=true");
+ EXPECT_TRUE(st.ok());
+ client.set_method(HEAD);
+ client.set_basic_auth("test1", "");
+ st = client.execute();
+ EXPECT_TRUE(st.ok());
+ uint64_t len = 0;
+ st = client.get_content_length(&len);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(5, len);
+ std::string md5_value;
+ st = client.get_content_md5(&md5_value);
+ EXPECT_TRUE(st.ok());
+
+ Md5Digest md5;
+ md5.update("md5sum", 6);
+ md5.digest();
+ EXPECT_EQ(md5_value, md5.hex());
+ }
+}
+
+TEST_F(HttpClientTest, download_file_md5) {
+ std::string url = hostname + "/download_file";
+ HttpClient client;
+ auto st = client.init(url);
+ EXPECT_TRUE(st.ok());
+ client.set_method(HEAD);
+ client.set_basic_auth("test1", "");
+ st = client.execute();
+ EXPECT_TRUE(st.ok());
+
+ std::string md5_value;
+ st = client.get_content_md5(&md5_value);
+ EXPECT_TRUE(st.ok());
+
+ int fd = open("/proc/self/exe", O_RDONLY);
+ ASSERT_TRUE(fd >= 0);
+ struct stat stat;
+ ASSERT_TRUE(fstat(fd, &stat) >= 0);
+
+ int64_t file_size = stat.st_size;
+ Md5Digest md5;
+ void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
+ md5.update(buf, file_size);
+ md5.digest();
+ munmap(buf, file_size);
+
+ EXPECT_EQ(md5_value, md5.hex());
+ close(fd);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]