This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 95a073aa0 IMPALA-14223: Cleanup subdirectories in INSERT OVERWRITE
95a073aa0 is described below
commit 95a073aa08f88e3aa13345fab02b4b6981f18ca6
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Tue Jul 1 18:23:49 2025 +0200
IMPALA-14223: Cleanup subdirectories in INSERT OVERWRITE
If an external table contains data files in subdirectories, and
recursive listing is enabled, Impala considers the files in the
subdirectories as part of the table. However, currently INSERT OVERWRITE
and TRUNCATE do not always delete these files, leading to data
corruption.
This change takes care of INSERT OVERWRITE.
Before this change, for unpartitioned external tables, only top-level
data files were deleted and data files in subdirectories (whether
hidden, ignored or normal) were kept.
After this change, directories are also deleted in addition to
(non-hidden) data files, with the exception of hidden and ignored
directories. (Note: for ignored directories, see
--ignored_dir_prefix_list).
Note that for partitioned tables, INSERT OVERWRITE completely removes
the partition directories that are affected, and this change does not
alter that.
Testing:
- extended the tests in test_recursive_listing.py::TestRecursiveListing
Change-Id: I1a40a22e18e6a384da982d300422ac8995ed0273
Reviewed-on: http://gerrit.cloudera.org:8080/23165
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Daniel Becker <[email protected]>
---
be/src/runtime/dml-exec-state.cc | 93 ++++++++++++++++++++-----------
tests/metadata/test_recursive_listing.py | 31 +++++++++++
tests/query_test/test_insert_behaviour.py | 5 +-
3 files changed, 95 insertions(+), 34 deletions(-)
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 24cec2992..d54ff27a4 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -17,6 +17,7 @@
#include "runtime/dml-exec-state.h"
+#include <algorithm>
#include <mutex>
#include <boost/algorithm/string.hpp>
@@ -47,6 +48,8 @@
DEFINE_bool(insert_inherit_permissions, false, "If true, new directories
created by "
"INSERTs will inherit the permissions of their parent directories");
+DECLARE_string(ignored_dir_prefix_list);
+
using namespace impala;
using boost::algorithm::is_any_of;
using boost::algorithm::split;
@@ -179,6 +182,61 @@ bool
DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update,
return catalog_update->updated_partitions.size() != 0;
}
+// Deletes data files and directories in 'path'. Keeps top-level
+// - hidden files
+// - hidden directories
+// - ignored directories.
+Status DeleteUnpartitionedDirData(const hdfsFS& fs_connection,
+ const string& path, HdfsOperationSet* hdfs_ops) {
+ int num_files = 0;
+ // hfdsListDirectory() only sets errno if there is an error, but it doesn't
set
+ // it to 0 if the call succeed. When there is no error, errno could be any
+ // value. So need to clear errno before calling it.
+ // Once HDFS-8407 is fixed, the errno reset won't be needed.
+ errno = 0;
+ hdfsFileInfo* existing_files_and_dirs =
+ hdfsListDirectory(fs_connection, path.c_str(), &num_files);
+ if (existing_files_and_dirs == nullptr && errno == EAGAIN) {
+ errno = 0;
+ existing_files_and_dirs =
+ hdfsListDirectory(fs_connection, path.c_str(), &num_files);
+ }
+ // hdfsListDirectory() returns nullptr not only when there is an error but
also
+ // when the directory is empty(HDFS-8407). Need to check errno to make sure
+ // the call fails.
+ if (existing_files_and_dirs == nullptr && errno != 0) {
+ return Status(GetHdfsErrorMsg("Could not list directory: ", path));
+ }
+
+ vector<string> ignored_prefixes;
+ boost::split(ignored_prefixes, FLAGS_ignored_dir_prefix_list,
+ [](char ch) { return ch == ','; });
+
+ for (int i = 0; i < num_files; ++i) {
+ const string file_or_dir_name =
+
boost::filesystem::path(existing_files_and_dirs[i].mName).filename().string();
+ if (!IsHiddenFile(file_or_dir_name)) {
+ if (existing_files_and_dirs[i].mKind == kObjectKindFile) {
+ hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
+ } else if (existing_files_and_dirs[i].mKind == kObjectKindDirectory) {
+ auto file_or_dir_name_starts_with_non_empty_prefix =
+ [&file_or_dir_name](const string& prefix) {
+ return !prefix.empty() && file_or_dir_name.find(prefix) == 0;
+ };
+
+ bool dir_ignored = std::any_of(ignored_prefixes.begin(),
ignored_prefixes.end(),
+ file_or_dir_name_starts_with_non_empty_prefix);
+ if (!dir_ignored) {
+ hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
+ }
+ }
+ }
+ }
+ hdfsFreeFileInfo(existing_files_and_dirs, num_files);
+
+ return Status::OK();
+}
+
Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
RuntimeProfile* profile) {
@@ -225,38 +283,9 @@ Status DmlExecState::FinalizeHdfsInsert(const
TFinalizeParams& params,
if (partition.first.empty()) {
// If the root directory is written to, then the table must not be
partitioned
DCHECK(per_partition_status_.size() == 1);
- // We need to be a little more careful, and only delete data files in
the root
- // because the tmp directories the sink(s) wrote are there also.
- // So only delete files in the table directory - all files are treated
as data
- // files by Hive and Impala, but directories are ignored (and may
legitimately
- // be used to store permanent non-table data by other applications).
- int num_files = 0;
- // hfdsListDirectory() only sets errno if there is an error, but it
doesn't set
- // it to 0 if the call succeed. When there is no error, errno could be
any
- // value. So need to clear errno before calling it.
- // Once HDFS-8407 is fixed, the errno reset won't be needed.
- errno = 0;
- hdfsFileInfo* existing_files =
- hdfsListDirectory(partition_fs_connection, part_path.c_str(),
&num_files);
- if (existing_files == nullptr && errno == EAGAIN) {
- errno = 0;
- existing_files =
- hdfsListDirectory(partition_fs_connection, part_path.c_str(),
&num_files);
- }
- // hdfsListDirectory() returns nullptr not only when there is an error
but also
- // when the directory is empty(HDFS-8407). Need to check errno to make
sure
- // the call fails.
- if (existing_files == nullptr && errno != 0) {
- return Status(GetHdfsErrorMsg("Could not list directory: ",
part_path));
- }
- for (int i = 0; i < num_files; ++i) {
- const string filename =
-
boost::filesystem::path(existing_files[i].mName).filename().string();
- if (existing_files[i].mKind == kObjectKindFile &&
!IsHiddenFile(filename)) {
- partition_create_ops.Add(DELETE, existing_files[i].mName);
- }
- }
- hdfsFreeFileInfo(existing_files, num_files);
+
+ RETURN_IF_ERROR(DeleteUnpartitionedDirData(
+ partition_fs_connection, part_path, &partition_create_ops));
} else {
// This is a partition directory, not the root directory; we can delete
// recursively with abandon, after checking that it ever existed.
diff --git a/tests/metadata/test_recursive_listing.py
b/tests/metadata/test_recursive_listing.py
index 9e064a4ad..ab00cf3a3 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -151,6 +151,37 @@ class TestRecursiveListing(ImpalaTestSuite):
assert len(self._show_files(fq_tbl_name)) == 0
assert len(self._get_rows(fq_tbl_name)) == 0
+ # Verify that INSERT OVERWRITE removes data files in subdirectories too.
+ # Regression test for IMPALA-13778.
+ self.filesystem_client.create_file("{0}/file1.txt".format(part_path),
"file1")
+ self.filesystem_client.make_dir("{0}/dir1".format(part_path))
+ self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path),
"file1")
+ # Also add a hidden and an ignored dir.
+ self.filesystem_client.make_dir("{0}/_tmp.hiddendir".format(part_path))
+ self.filesystem_client.create_file(
+ "{0}/_tmp.hiddendir/file1.txt".format(part_path), "file1")
+ self.filesystem_client.make_dir("{0}/-tmp.ignoreddir".format(part_path))
+ self.filesystem_client.create_file(
+ "{0}/-tmp.ignoreddir/file1.txt".format(part_path), "file1")
+
+ self.execute_query_expect_success(self.client,
+ "insert overwrite {tbl} {part} select 'str'".format(
+ tbl=fq_tbl_name, part="partition(p=1)" if partitioned else ""))
+ assert len(self._show_files(fq_tbl_name)) == 1
+ assert len(self._get_rows(fq_tbl_name)) == 1
+
+ assert ((not partitioned)
+ ==
self.filesystem_client.exists("{0}/_tmp.hiddendir".format(part_path)))
+ assert ((not partitioned)
+ == self.filesystem_client.exists(
+ "{0}/_tmp.hiddendir/file1.txt".format(part_path)))
+
+ assert ((not partitioned)
+ ==
self.filesystem_client.exists("{0}/-tmp.ignoreddir".format(part_path)))
+ assert ((not partitioned)
+ == self.filesystem_client.exists(
+ "{0}/-tmp.ignoreddir/file1.txt".format(part_path)))
+
@SkipIfFS.no_partial_listing
@pytest.mark.execute_serially
def test_large_staging_dirs(self, unique_database):
diff --git a/tests/query_test/test_insert_behaviour.py
b/tests/query_test/test_insert_behaviour.py
index 88d5446d9..51c431cf7 100644
--- a/tests/query_test/test_insert_behaviour.py
+++ b/tests/query_test/test_insert_behaviour.py
@@ -67,7 +67,8 @@ class TestInsertBehaviour(ImpalaTestSuite):
TBL_NAME = "insert_overwrite_nopart"
table_dir = "%s/functional.db/%s/" % (WAREHOUSE, TBL_NAME)
hidden_file_locations = [".hidden", "_hidden"]
- dir_locations = ["dir", ".hidden_dir"]
+ hidden_dir_locations = [".hidden_dir", "_hidden_dir"]
+ dir_locations = ["dir"] + hidden_dir_locations
for dir_ in dir_locations:
self.filesystem_client.make_dir(table_dir + dir_)
@@ -87,7 +88,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
assert self.filesystem_client.exists(table_dir + file_), "Hidden file
{0} was " \
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_)
- for dir_ in dir_locations:
+ for dir_ in hidden_dir_locations:
assert self.filesystem_client.exists(table_dir + dir_), "Directory {0}
was " \
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_)