This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 95a073aa0 IMPALA-14223: Cleanup subdirectories in INSERT OVERWRITE
95a073aa0 is described below

commit 95a073aa08f88e3aa13345fab02b4b6981f18ca6
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Tue Jul 1 18:23:49 2025 +0200

    IMPALA-14223: Cleanup subdirectories in INSERT OVERWRITE
    
    If an external table contains data files in subdirectories, and
    recursive listing is enabled, Impala considers the files in the
    subdirectories as part of the table. However, currently INSERT OVERWRITE
    and TRUNCATE do not always delete these files, leading to data
    corruption.
    
    This change takes care of INSERT OVERWRITE.
    
    Before this change, for unpartitioned external tables, only top-level
    data files were deleted and data files in subdirectories (whether
    hidden, ignored or normal) were kept.
    
    After this change, directories are also deleted in addition to
    (non-hidden) data files, with the exception of hidden and ignored
    directories. (Note: for ignored directories, see
    --ignored_dir_prefix_list).
    
    Note that for partitioned tables, INSERT OVERWRITE completely removes
    the partition directories that are affected, and this change does not
    alter that.
    
    Testing:
     - extended the tests in test_recursive_listing.py::TestRecursiveListing
    
    Change-Id: I1a40a22e18e6a384da982d300422ac8995ed0273
    Reviewed-on: http://gerrit.cloudera.org:8080/23165
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Daniel Becker <[email protected]>
---
 be/src/runtime/dml-exec-state.cc          | 93 ++++++++++++++++++++-----------
 tests/metadata/test_recursive_listing.py  | 31 +++++++++++
 tests/query_test/test_insert_behaviour.py |  5 +-
 3 files changed, 95 insertions(+), 34 deletions(-)

diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 24cec2992..d54ff27a4 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -17,6 +17,7 @@
 
 #include "runtime/dml-exec-state.h"
 
+#include <algorithm>
 #include <mutex>
 
 #include <boost/algorithm/string.hpp>
@@ -47,6 +48,8 @@
 DEFINE_bool(insert_inherit_permissions, false, "If true, new directories 
created by "
     "INSERTs will inherit the permissions of their parent directories");
 
+DECLARE_string(ignored_dir_prefix_list);
+
 using namespace impala;
 using boost::algorithm::is_any_of;
 using boost::algorithm::split;
@@ -179,6 +182,61 @@ bool 
DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update,
   return catalog_update->updated_partitions.size() != 0;
 }
 
+// Deletes data files and directories in 'path'. Keeps top-level
+//   - hidden files
+//   - hidden directories
+//   - ignored directories.
+Status DeleteUnpartitionedDirData(const hdfsFS& fs_connection,
+    const string& path, HdfsOperationSet* hdfs_ops) {
+  int num_files = 0;
+  // hfdsListDirectory() only sets errno if there is an error, but it doesn't 
set
+  // it to 0 if the call succeed. When there is no error, errno could be any
+  // value. So need to clear errno before calling it.
+  // Once HDFS-8407 is fixed, the errno reset won't be needed.
+  errno = 0;
+  hdfsFileInfo* existing_files_and_dirs =
+      hdfsListDirectory(fs_connection, path.c_str(), &num_files);
+  if (existing_files_and_dirs == nullptr && errno == EAGAIN) {
+    errno = 0;
+    existing_files_and_dirs =
+        hdfsListDirectory(fs_connection, path.c_str(), &num_files);
+  }
+  // hdfsListDirectory() returns nullptr not only when there is an error but 
also
+  // when the directory is empty(HDFS-8407). Need to check errno to make sure
+  // the call fails.
+  if (existing_files_and_dirs == nullptr && errno != 0) {
+    return Status(GetHdfsErrorMsg("Could not list directory: ", path));
+  }
+
+  vector<string> ignored_prefixes;
+  boost::split(ignored_prefixes, FLAGS_ignored_dir_prefix_list,
+      [](char ch) { return ch == ','; });
+
+  for (int i = 0; i < num_files; ++i) {
+    const string file_or_dir_name =
+        
boost::filesystem::path(existing_files_and_dirs[i].mName).filename().string();
+    if (!IsHiddenFile(file_or_dir_name)) {
+      if (existing_files_and_dirs[i].mKind == kObjectKindFile) {
+        hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
+      } else if (existing_files_and_dirs[i].mKind == kObjectKindDirectory) {
+        auto file_or_dir_name_starts_with_non_empty_prefix =
+          [&file_or_dir_name](const string& prefix) {
+          return !prefix.empty() && file_or_dir_name.find(prefix) == 0;
+        };
+
+        bool dir_ignored = std::any_of(ignored_prefixes.begin(), 
ignored_prefixes.end(),
+            file_or_dir_name_starts_with_non_empty_prefix);
+        if (!dir_ignored)  {
+          hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
+        }
+      }
+    }
+  }
+  hdfsFreeFileInfo(existing_files_and_dirs, num_files);
+
+  return Status::OK();
+}
+
 Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
     bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
     RuntimeProfile* profile) {
@@ -225,38 +283,9 @@ Status DmlExecState::FinalizeHdfsInsert(const 
TFinalizeParams& params,
       if (partition.first.empty()) {
         // If the root directory is written to, then the table must not be 
partitioned
         DCHECK(per_partition_status_.size() == 1);
-        // We need to be a little more careful, and only delete data files in 
the root
-        // because the tmp directories the sink(s) wrote are there also.
-        // So only delete files in the table directory - all files are treated 
as data
-        // files by Hive and Impala, but directories are ignored (and may 
legitimately
-        // be used to store permanent non-table data by other applications).
-        int num_files = 0;
-        // hfdsListDirectory() only sets errno if there is an error, but it 
doesn't set
-        // it to 0 if the call succeed. When there is no error, errno could be 
any
-        // value. So need to clear errno before calling it.
-        // Once HDFS-8407 is fixed, the errno reset won't be needed.
-        errno = 0;
-        hdfsFileInfo* existing_files =
-            hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
-        if (existing_files == nullptr && errno == EAGAIN) {
-          errno = 0;
-          existing_files =
-              hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
-        }
-        // hdfsListDirectory() returns nullptr not only when there is an error 
but also
-        // when the directory is empty(HDFS-8407). Need to check errno to make 
sure
-        // the call fails.
-        if (existing_files == nullptr && errno != 0) {
-          return Status(GetHdfsErrorMsg("Could not list directory: ", 
part_path));
-        }
-        for (int i = 0; i < num_files; ++i) {
-          const string filename =
-              
boost::filesystem::path(existing_files[i].mName).filename().string();
-          if (existing_files[i].mKind == kObjectKindFile && 
!IsHiddenFile(filename)) {
-            partition_create_ops.Add(DELETE, existing_files[i].mName);
-          }
-        }
-        hdfsFreeFileInfo(existing_files, num_files);
+
+        RETURN_IF_ERROR(DeleteUnpartitionedDirData(
+            partition_fs_connection, part_path, &partition_create_ops));
       } else {
         // This is a partition directory, not the root directory; we can delete
         // recursively with abandon, after checking that it ever existed.
diff --git a/tests/metadata/test_recursive_listing.py 
b/tests/metadata/test_recursive_listing.py
index 9e064a4ad..ab00cf3a3 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -151,6 +151,37 @@ class TestRecursiveListing(ImpalaTestSuite):
     assert len(self._show_files(fq_tbl_name)) == 0
     assert len(self._get_rows(fq_tbl_name)) == 0
 
+    # Verify that INSERT OVERWRITE removes data files in subdirectories too.
+    # Regression test for IMPALA-13778.
+    self.filesystem_client.create_file("{0}/file1.txt".format(part_path), 
"file1")
+    self.filesystem_client.make_dir("{0}/dir1".format(part_path))
+    self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path), 
"file1")
+    # Also add a hidden and an ignored dir.
+    self.filesystem_client.make_dir("{0}/_tmp.hiddendir".format(part_path))
+    self.filesystem_client.create_file(
+        "{0}/_tmp.hiddendir/file1.txt".format(part_path), "file1")
+    self.filesystem_client.make_dir("{0}/-tmp.ignoreddir".format(part_path))
+    self.filesystem_client.create_file(
+        "{0}/-tmp.ignoreddir/file1.txt".format(part_path), "file1")
+
+    self.execute_query_expect_success(self.client,
+        "insert overwrite {tbl} {part} select 'str'".format(
+            tbl=fq_tbl_name, part="partition(p=1)" if partitioned else ""))
+    assert len(self._show_files(fq_tbl_name)) == 1
+    assert len(self._get_rows(fq_tbl_name)) == 1
+
+    assert ((not partitioned)
+        == 
self.filesystem_client.exists("{0}/_tmp.hiddendir".format(part_path)))
+    assert ((not partitioned)
+        == self.filesystem_client.exists(
+                "{0}/_tmp.hiddendir/file1.txt".format(part_path)))
+
+    assert ((not partitioned)
+        == 
self.filesystem_client.exists("{0}/-tmp.ignoreddir".format(part_path)))
+    assert ((not partitioned)
+        == self.filesystem_client.exists(
+                "{0}/-tmp.ignoreddir/file1.txt".format(part_path)))
+
   @SkipIfFS.no_partial_listing
   @pytest.mark.execute_serially
   def test_large_staging_dirs(self, unique_database):
diff --git a/tests/query_test/test_insert_behaviour.py 
b/tests/query_test/test_insert_behaviour.py
index 88d5446d9..51c431cf7 100644
--- a/tests/query_test/test_insert_behaviour.py
+++ b/tests/query_test/test_insert_behaviour.py
@@ -67,7 +67,8 @@ class TestInsertBehaviour(ImpalaTestSuite):
     TBL_NAME = "insert_overwrite_nopart"
     table_dir = "%s/functional.db/%s/" % (WAREHOUSE, TBL_NAME)
     hidden_file_locations = [".hidden", "_hidden"]
-    dir_locations = ["dir", ".hidden_dir"]
+    hidden_dir_locations = [".hidden_dir", "_hidden_dir"]
+    dir_locations = ["dir"] + hidden_dir_locations
 
     for dir_ in dir_locations:
       self.filesystem_client.make_dir(table_dir + dir_)
@@ -87,7 +88,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
       assert self.filesystem_client.exists(table_dir + file_), "Hidden file 
{0} was " \
           "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_)
 
-    for dir_ in dir_locations:
+    for dir_ in hidden_dir_locations:
       assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} 
was " \
           "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_)
 

Reply via email to