This is an automated email from the ASF dual-hosted git repository. arawat pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ddd4f4f8d68addce1542d57f94c637210a090150 Author: Yida Wu <[email protected]> AuthorDate: Tue Feb 25 12:14:29 2025 -0800 IMPALA-13798: Cleanup host-level remote scratch dir on shutdown IMPALA-13677 introduces cleanup of remote scratch files on startup, and also introduces a host-level directory to do easier cleanup. An empty host-level directory in remote storage does not use resources, but it can stay there forever if a host goes offline permanently or hostname changes. To improve this behavior, this patch adds support for removing the remote host-level temporary directory on shutdown. This helps prevent too many empty directories left in the remote scratch path, especially for hosts that never restart. Changed the flag remote_scratch_cleanup_on_startup to remote_scratch_cleanup_on_start_stop, so that this flag also controls cleanup on shutdown. Tests: Passed exhaustive tests. Added an e2e testcase test_scratch_dirs_remote_dir_removal_on_shutdown. Change-Id: Ic8f446894afdf975630aef80a9d964a9a78d3b46 Reviewed-on: http://gerrit.cloudera.org:8080/22549 Reviewed-by: Daniel Becker <[email protected]> Reviewed-by: Abhishek Rawat <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/tmp-file-mgr-internal.h | 9 +++++++ be/src/runtime/tmp-file-mgr-test.cc | 4 +-- be/src/runtime/tmp-file-mgr.cc | 43 +++++++++++++++++++++++++------ be/src/runtime/tmp-file-mgr.h | 3 +++ be/src/service/impala-server.cc | 3 +++ tests/custom_cluster/test_scratch_disk.py | 35 +++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 10 deletions(-) diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h index e800636f4..c371cb1f0 100644 --- a/be/src/runtime/tmp-file-mgr-internal.h +++ b/be/src/runtime/tmp-file-mgr-internal.h @@ -410,6 +410,9 @@ class TmpDir { virtual Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) = 0; + /// Get a connection to the path of the dir. Only for the remote dir. + virtual Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) = 0; + int64_t bytes_limit() { return bytes_limit_; } int priority() { return priority_; } const string& path() { return path_; } @@ -458,6 +461,10 @@ class TmpDirLocal : public TmpDir { TmpDirLocal(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; + Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override { + DCHECK(false) << "GetConnection() is not supported for a local temporary dir"; + return Status("GetConnection() is not supported for a local temporary dir"); + } bool is_local() override { return true; } private: @@ -477,6 +484,7 @@ class TmpDirS3 : public TmpDir { TmpDirS3(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; + Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override; private: Status ParsePathTokens(std::vector<string>& tokens) override; @@ -487,6 +495,7 @@ class TmpDirHdfs : public TmpDir { TmpDirHdfs(const std::string& path) : TmpDir(path) {} Status VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) override; + Status GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* conn) override; private: Status ParsePathTokens(std::vector<string>& tokens) override; diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 73b70747d..9b44d1415 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -60,7 +60,7 @@ DECLARE_string(remote_tmp_file_size); DECLARE_int32(wait_for_spill_buffer_timeout_s); DECLARE_bool(remote_batch_read); DECLARE_string(remote_read_memory_buffer_size); -DECLARE_bool(remote_scratch_cleanup_on_startup); +DECLARE_bool(remote_scratch_cleanup_on_start_stop); namespace impala { @@ -1164,7 +1164,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) { // Create a fake s3 connection in order to pass the connection verification. HdfsFsCache::HdfsFsMap fake_hdfs_conn_map; hdfsFS fake_conn = reinterpret_cast<hdfsFS>(1); - FLAGS_remote_scratch_cleanup_on_startup = false; + FLAGS_remote_scratch_cleanup_on_start_stop = false; fake_hdfs_conn_map.insert(make_pair("s3a://fake_host/", fake_conn)); // Two types of paths, one with directory, one without. vector<string> s3_paths{"s3a://fake_host", "s3a://fake_host/dir"}; diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 23c6b4091..393f74699 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -118,10 +118,10 @@ DEFINE_bool(remote_batch_read, false, "Set if the system uses batch reading for the remote temporary files. Batch reading" "allows reading a block asynchronously when the buffer pool is trying to pin one" "page of that block."); -DEFINE_bool(remote_scratch_cleanup_on_startup, true, +DEFINE_bool(remote_scratch_cleanup_on_start_stop, true, "If enabled, the Impala daemon will clean up the host-level directory within the " - "specified remote scratch directory during startup to remove potential leftover " - "files. This assumes a single Impala daemon per host. " + "specified remote scratch directory during both startup and shutdown to remove " + "potential leftover files. This assumes a single Impala daemon per host. " "For multiple daemons on a host, set this to false to prevent unintended cleanup."); using boost::algorithm::is_any_of; @@ -415,6 +415,19 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers, return Status::OK(); } +void TmpFileMgr::CleanupAtShutdown() { + // Try to clear the host-level remote temporary directory. + if (tmp_dirs_remote_ == nullptr) return; + hdfsFS hdfs_conn; + Status status = tmp_dirs_remote_->GetConnection(this, &hdfs_conn); + if (!status.ok()) { + LOG(WARNING) << "Unable to get a connection to " << tmp_dirs_remote_->path_ + << " for clearing the directory on shutdown"; + return; + } + RemoveRemoteDirForHost(tmp_dirs_remote_->path_, hdfs_conn); +} + Status TmpFileMgr::CreateTmpFileBufferPoolThread(MetricGroup* metrics) { DCHECK(metrics != nullptr); tmp_dirs_remote_ctrl_.tmp_file_pool_.reset(new TmpFileBufferPool(this)); @@ -459,7 +472,7 @@ static string ConstructRemoteDirPath(const string& base_dir, const string& hostn } void TmpFileMgr::RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn) { - if (!FLAGS_remote_scratch_cleanup_on_startup) return; + if (!FLAGS_remote_scratch_cleanup_on_start_stop) return; DCHECK(hdfs_conn != nullptr); const string hostlevel_dir = ConstructRemoteDirPath( dir, ExecEnv::GetInstance()->configured_backend_address().hostname); @@ -836,6 +849,14 @@ Status TmpDirS3::ParsePathTokens(vector<string>& toks) { return Status::OK(); } +Status TmpDirS3::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) { + DCHECK(tmp_mgr != nullptr); + DCHECK(!path_.empty()); + DCHECK(hdfs_conn != nullptr); + return HdfsFsCache::instance()->GetConnection( + path_, hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()); +} + Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { // For the S3 path, it doesn't need to create the directory for the uploading @@ -843,8 +864,7 @@ Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_ DCHECK(tmp_mgr != nullptr); DCHECK(!path_.empty()); hdfsFS hdfs_conn; - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options())); + RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn)); tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn); return Status::OK(); } @@ -870,12 +890,19 @@ Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) { return Status::OK(); } +Status TmpDirHdfs::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) { + DCHECK(tmp_mgr != nullptr); + DCHECK(!path_.empty()); + DCHECK(hdfs_conn != nullptr); + return HdfsFsCache::instance()->GetConnection( + path_, hdfs_conn, &(tmp_mgr->hdfs_conns_)); +} + Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { DCHECK(!path_.empty()); hdfsFS hdfs_conn; - RETURN_IF_ERROR( - HdfsFsCache::instance()->GetConnection(path_, &hdfs_conn, &(tmp_mgr->hdfs_conns_))); + RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn)); if (hdfsExists(hdfs_conn, path_.c_str()) != 0) { // If the impala scratch path in hdfs doesn't exist, attempt to create the path to // verify it's valid and writable for scratch usage. diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index a345b5a8a..e9d929d81 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -205,6 +205,9 @@ class TmpFileMgr { // Create the TmpFile buffer pool thread for async buffer file reservation. Status CreateTmpFileBufferPoolThread(MetricGroup* metrics) WARN_UNUSED_RESULT; + // Clean up the temporary directory during shutdown if needed. + void CleanupAtShutdown(); + /// Try to reserve space for the buffer file from local buffer directory. /// If quick_return is true, the function won't wait if there is no available space. Status ReserveLocalBufferSpace(bool quick_return) WARN_UNUSED_RESULT; diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index bd70838c4..55b5a1ebc 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -3564,6 +3564,9 @@ bool ImpalaServer::CancelQueriesForGracefulShutdown() { } } + // Clean up temporary files if needed. + ExecEnv::GetInstance()->tmp_file_mgr()->CleanupAtShutdown(); + // Drain the completed queries queue to the query log table. if (FLAGS_enable_workload_mgmt) { ShutdownWorkloadManagement(); diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 05508e3c3..05bf1e410 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -569,3 +569,38 @@ class TestScratchDir(CustomClusterTestSuite): # Verify that the leftover files being removed after the impala daemon restarted. files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) assert files_result == "" + + @pytest.mark.execute_serially + @SkipIf.not_scratch_fs + def test_scratch_dirs_remote_dir_removal_on_shutdown(self, vector): + # Test remote scratch directory cleanup on Impala daemon shutdown. + normal_dirs = self.generate_dirs('scratch_dirs_remote_spill', 1) + normal_dirs.append(self.dfs_tmp_path()) + self._start_impala_cluster([ + '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)), + '--impalad_args=--allow_multiple_scratch_dirs_per_device=true', + '--impalad_args=--shutdown_grace_period_s=5', + '--impalad_args=--shutdown_deadline_s=10'], + cluster_size=1, + expected_num_impalads=1) + self.assert_impalad_log_contains("INFO", "Using scratch directory ", + expected_count=len(normal_dirs) - 1) + vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit + impalad = self.cluster.impalads[0] + client = impalad.service.create_beeswax_client() + self.execute_query_async_using_client(client, self.spill_query_big_table, vector) + verifier = MetricVerifier(impalad.service) + verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2) + # Dir0 is the remote directory. + impalad.service.wait_for_metric_value( + 'tmp-file-mgr.scratch-space-bytes-used.dir-0', 1, allow_greater=True) + # Shut down the impalad. + SHUTDOWN = ": shutdown()" + self.execute_query_expect_success(client, SHUTDOWN) + impalad.wait_for_exit() + client.close() + # Verify that no host-level dir in the remote scratch dirs after shutdown. + full_dfs_tmp_path = "{}/impala-scratch".format(self.dfs_tmp_path()) + files_result = subprocess.check_output(["hdfs", "dfs", "-ls", full_dfs_tmp_path]) + assert files_result == "" + impalad.start()
