This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 39af39f0905652ff524baa1a864f9c17e9c1b196
Author: stiga-huang <[email protected]>
AuthorDate: Sun Jul 31 15:47:31 2022 +0800

    IMPALA-11464: Skip listing staging dirs to avoid failures on them
    
    Hive or other systems will generate staging/tmp dirs under the
    table/partition folders while loading/inserting data. They are removed
    when the operation is done. File metadata loading in catalogd could fail
    if it's listing files of such dirs. This is found on HDFS where file
    listing is done in batches. Each batch contains a partial list of 1000
    items (configured by "dfs.ls.limit"). If the dir is removed, the next
    listing, e.g. the next hasNext() call on the RemoteIterator, will fail
    with FileNotFoundException. Such error on staging/tmp dirs should not
    fail the metadata loading. However, if it happens on a partition dir,
    the metadata loading should fail to avoid stale metadata.
    
    This patch adds a check before listing the dir. If it's a staging/tmp
    dir, catalogd will just ignore it. Also adds a debug action,
    catalogd_pause_after_hdfs_remote_iterator_creation, to inject
    sleeps after the first partial listing (happens in creating the
    RemoteIterator). So we can reproduce the FileNotFoundException stably.
    
    Tests:
     - Add test on removing a large staging dir (contains 1024 files) during
       REFRESH. Metadata loading fails consistently before this fix.
     - Add test on removing a large partition dir (contains 1024 files)
       during REFRESH. Verify metadata loading fails as expected.
    
    Change-Id: Ic848e6c8563a1e0bf294cd50167dfc40f66a56cb
    Reviewed-on: http://gerrit.cloudera.org:8080/18801
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/18915
    Tested-by: Quanlong Huang <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
---
 .../org/apache/impala/common/FileSystemUtil.java   |  21 +++-
 .../java/org/apache/impala/util/DebugUtils.java    |   4 +
 tests/metadata/test_recursive_listing.py           | 116 +++++++++++++++++++--
 tests/util/filesystem_base.py                      |   7 ++
 tests/util/hdfs_util.py                            |  12 +++
 5 files changed, 150 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index e5439d6b7..ac2dbaf37 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -724,7 +724,7 @@ public class FileSystemUtil {
           return listFiles(fs, p, true, debugAction);
         }
         DebugUtils.executeDebugAction(debugAction, 
DebugUtils.REFRESH_HDFS_LISTING_DELAY);
-        return new FilterIterator(p, new RecursingIterator<>(fs, p,
+        return new FilterIterator(p, new RecursingIterator<>(fs, p, 
debugAction,
             FileSystemUtil::listStatusIterator));
       }
       DebugUtils.executeDebugAction(debugAction, 
DebugUtils.REFRESH_HDFS_LISTING_DELAY);
@@ -748,7 +748,7 @@ public class FileSystemUtil {
       if (hasRecursiveListFiles(fs)) {
         baseIterator = fs.listFiles(p, recursive);
       } else {
-        baseIterator = new RecursingIterator<>(fs, p,
+        baseIterator = new RecursingIterator<>(fs, p, debugAction,
             FileSystemUtil::listLocatedStatusIterator);
       }
       return new FilterIterator(p, baseIterator);
@@ -926,17 +926,22 @@ public class FileSystemUtil {
     private final BiFunctionWithException<FileSystem, Path, RemoteIterator<T>>
         newIterFunc_;
     private final FileSystem fs_;
+    private final String debugAction_;
     private final Stack<RemoteIterator<T>> iters_ = new Stack<>();
     private RemoteIterator<T> curIter_;
     private T curFile_;
 
-    private RecursingIterator(FileSystem fs, Path startPath,
+    private RecursingIterator(FileSystem fs, Path startPath, String 
debugAction,
         BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> 
newIterFunc)
         throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
+      this.debugAction_ = debugAction;
       this.newIterFunc_ = Preconditions.checkNotNull(newIterFunc);
       Preconditions.checkNotNull(startPath);
       curIter_ = newIterFunc.apply(fs, startPath);
+      LOG.trace("listed start path: {}", startPath);
+      DebugUtils.executeDebugAction(debugAction,
+          DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION);
     }
 
     @Override
@@ -978,6 +983,13 @@ public class FileSystemUtil {
      * @throws IOException if any IO error occurs
      */
     private void handleFileStat(T fileStatus) throws IOException {
+      LOG.trace("handleFileStat: {}", fileStatus.getPath());
+      if (isIgnoredDir(fileStatus.getPath())) {
+        LOG.debug("Ignoring {} since it is either a hidden directory or a 
temporary "
+            + "staging directory", fileStatus.getPath());
+        curFile_ = null;
+        return;
+      }
       if (fileStatus.isFile()) {
         curFile_ = fileStatus;
         return;
@@ -987,6 +999,9 @@ public class FileSystemUtil {
       iters_.push(curIter_);
       curIter_ = subIter;
       curFile_ = fileStatus;
+      LOG.trace("listed sub dir: {}", fileStatus.getPath());
+      DebugUtils.executeDebugAction(debugAction_,
+          DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION);
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 3c48940b3..f1587f7a4 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -40,6 +40,10 @@ public class DebugUtils {
   public static final String REFRESH_HDFS_LISTING_DELAY
       = "catalogd_refresh_hdfs_listing_delay";
 
+  // debug action label for introducing pauses after creating HDFS 
RemoteIterators.
+  public static final String REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION
+      = "catalogd_pause_after_hdfs_remote_iterator_creation";
+
   // debug action label for introducing delay in alter table recover 
partitions command.
   public static final String RECOVER_PARTITIONS_DELAY = 
"catalogd_table_recover_delay";
 
diff --git a/tests/metadata/test_recursive_listing.py 
b/tests/metadata/test_recursive_listing.py
index ab7abe8f0..d2d50f261 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -9,9 +9,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from tests.common.impala_test_suite import ImpalaTestSuite
+
+import pytest
+import requests
+import time
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.test_dimensions import create_uncompressed_text_dimension
-from tests.common.skip import SkipIfLocal
+from tests.common.skip import (SkipIfLocal, SkipIfS3, SkipIfGCS, SkipIfCOS,
+                               SkipIfADLS)
 from tests.util.filesystem_utils import WAREHOUSE
 
 
@@ -21,6 +28,10 @@ class TestRecursiveListing(ImpalaTestSuite):
   This class tests that files are recursively listed within directories
   and partitions, and that REFRESH picks up changes within them.
   """
+  enable_fs_tracing_url = "http://localhost:25020/set_java_loglevel?"; \
+                          
"class=org.apache.impala.common.FileSystemUtil&level=trace"
+  reset_log_level_url = "http://localhost:25020/reset_java_loglevel";
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -44,13 +55,13 @@ class TestRecursiveListing(ImpalaTestSuite):
     result = self.client.execute("select * from {0}".format(table))
     return result.data
 
-  def test_unpartitioned(self, vector, unique_database):
-    self._do_test(vector, unique_database, partitioned=False)
+  def test_unpartitioned(self, unique_database):
+    self._do_test(unique_database, partitioned=False)
 
-  def test_partitioned(self, vector, unique_database):
-    self._do_test(vector, unique_database, partitioned=True)
+  def test_partitioned(self, unique_database):
+    self._do_test(unique_database, partitioned=True)
 
-  def _do_test(self, vector, unique_database, partitioned):
+  def _init_test_table(self, unique_database, partitioned):
     tbl_name = "t"
     fq_tbl_name = unique_database + "." + tbl_name
     tbl_path = '%s/%s.db/%s' % (WAREHOUSE, unique_database, tbl_name)
@@ -70,6 +81,11 @@ class TestRecursiveListing(ImpalaTestSuite):
     else:
       part_path = tbl_path
 
+    return fq_tbl_name, part_path
+
+  def _do_test(self, unique_database, partitioned):
+    fq_tbl_name, part_path = self._init_test_table(unique_database, 
partitioned)
+
     # Add a file inside a nested directory and refresh.
     self.filesystem_client.make_dir("{0}/dir1".format(part_path[1:]))
     
self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path[1:]),
@@ -125,3 +141,89 @@ class TestRecursiveListing(ImpalaTestSuite):
     self.execute_query_expect_success(self.client, "refresh 
{0}".format(fq_tbl_name))
     assert len(self._show_files(fq_tbl_name)) == 1
     assert len(self._get_rows(fq_tbl_name)) == 1
+
+  @SkipIfS3.variable_listing_times
+  @SkipIfCOS.variable_listing_times
+  @SkipIfGCS.variable_listing_times
+  @SkipIfADLS.eventually_consistent
+  @pytest.mark.execute_serially
+  @pytest.mark.stress
+  def test_large_staging_dirs(self, unique_database):
+    """Regression test for IMPALA-11464:
+    Test REFRESH survives with concurrent add/remove ops on large staging/tmp 
dirs
+    which contain more than 1000 files. Execute this serially since the sleep 
intervals
+    might not work with concurrent workloads. Test this only on HDFS since 
other FS might
+    not have partial listing (configured by dfs.ls.limit)."""
+    fq_tbl_name, part_path = self._init_test_table(unique_database, 
partitioned=True)
+    staging_dir = "{0}/.hive-staging".format(part_path)
+    # Expected timeline (before the fix of IMPALA-11464):
+    # 0ms:   catalogd list the partition dir and wait 200ms.
+    # 200ms: catalogd get the staging dir and list it partially, then wait 
200ms.
+    # 300ms: remove the staging dir.
+    # 400ms: catalogd consume the partial list (1000 items). Start listing the 
remaining
+    #        items and get FileNotFoundException due to the dir is removed.
+    # After the fix of IMPALA-11464, catalogd won't list the staging dir, so 
avoids
+    # hitting the exception.
+    self._test_listing_large_dir(fq_tbl_name, large_dir=staging_dir,
+                                 pause_ms_after_remote_iterator_creation=200,
+                                 pause_ms_before_file_cleanup=300,
+                                 refresh_should_fail=False)
+
+  @SkipIfS3.variable_listing_times
+  @SkipIfCOS.variable_listing_times
+  @SkipIfGCS.variable_listing_times
+  @SkipIfADLS.eventually_consistent
+  @pytest.mark.execute_serially
+  @pytest.mark.stress
+  def test_partition_dir_removed_inflight(self, unique_database):
+    """Test REFRESH with concurrent add/remove ops on large partition dirs
+    which contain more than 1000 files. Execute this serially since the sleep
+    intervals might not work with concurrent workloads. Test this only on HDFS
+    since other FS might not have partial listing (configured by 
dfs.ls.limit)"""
+    fq_tbl_name, part_path = self._init_test_table(unique_database, 
partitioned=True)
+    # Expected timeline:
+    # 0ms:   catalogd start listing the partition dir. Get 1000 items in the 
first
+    #        partial listing. Then wait for 300ms.
+    # 200ms: The partition dir is removed.
+    # 300ms: catalogd processed the 1000 items and start listing the remaining 
items.
+    #        Then get FileNotFoundException since the partition dir disappears.
+    self._test_listing_large_dir(fq_tbl_name, large_dir=part_path + '/',
+                                 pause_ms_after_remote_iterator_creation=300,
+                                 pause_ms_before_file_cleanup=200,
+                                 refresh_should_fail=True)
+
+  def _test_listing_large_dir(self, fq_tbl_name, large_dir,
+                              pause_ms_after_remote_iterator_creation,
+                              pause_ms_before_file_cleanup,
+                              refresh_should_fail):
+    # We need data files more than 1000 (default of dfs.ls.limit) so the 
initial
+    # file listing can't list all of them.
+    files = [large_dir + '/' + str(i) for i in range(1024)]
+    refresh_stmt = "refresh " + fq_tbl_name
+    self.client.set_configuration({
+      "debug_action": 
"catalogd_pause_after_hdfs_remote_iterator_creation:SLEEP@"
+                      + str(pause_ms_after_remote_iterator_creation)
+    })
+    # Enable TRACE logging in FileSystemUtil for better debugging
+    response = requests.get(self.enable_fs_tracing_url)
+    assert response.status_code == requests.codes.ok
+    try:
+      # self.filesystem_client is a DelegatingHdfsClient. It delegates 
delete_file_dir()
+      # and make_dir() to the underlying PyWebHdfsClient which expects the 
HDFS path
+      # without a leading '/'. So we use large_dir[1:] to remove the leading 
'/'.
+      self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True)
+      self.filesystem_client.make_dir(large_dir[1:])
+      self.filesystem_client.touch(files)
+      LOG.info("created staging files under " + large_dir)
+      handle = self.execute_query_async(refresh_stmt)
+      # Wait a moment to let REFRESH finish expected partial listing on the 
dir.
+      time.sleep(pause_ms_before_file_cleanup / 1000.0)
+      self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True)
+      LOG.info("removed staging dir " + large_dir)
+      try:
+        self.client.fetch(refresh_stmt, handle)
+        assert not refresh_should_fail, "REFRESH should fail"
+      except ImpalaBeeswaxException as e:
+        assert refresh_should_fail, "unexpected exception " + str(e)
+    finally:
+      requests.get(self.reset_log_level_url)
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
index f74453016..037eefccf 100644
--- a/tests/util/filesystem_base.py
+++ b/tests/util/filesystem_base.py
@@ -69,3 +69,10 @@ class BaseFilesystem(object):
     """Returns a list of integers which are all the file sizes of files found 
under
     'path'."""
     pass
+
+  @abstractmethod
+  def touch(self, paths):
+    """Updates the access and modification times of the files specified by 
'paths' to
+    the current time. If the files don't exist, zero length files will be 
created with
+    current time as the timestamp of them."""
+    pass
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index 81d07e650..50004d663 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -108,6 +108,8 @@ class DelegatingHdfsClient(BaseFilesystem):
   def getacl(self, path):
     return self.webhdfs_client.getacl(path)
 
+  def touch(self, paths):
+    return self.hdfs_filesystem_client.touch(paths)
 
 class PyWebHdfsClientWithChmod(PyWebHdfsClient):
   def chmod(self, path, permission):
@@ -309,6 +311,16 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     missing."""
     return path if path.startswith('/') else '/' + path
 
+  def touch(self, paths):
+    """Updates the access and modification times of the files specified by 
'paths' to
+    the current time. If the files don't exist, zero length files will be 
created with
+    current time as the timestamp of them."""
+    if isinstance(paths, list):
+      cmd = ['-touch'] + paths
+    else:
+      cmd = ['-touch', paths]
+    (status, stdout, stderr) = self._hadoop_fs_shell(cmd)
+    return status == 0
 
 def get_webhdfs_client_from_conf(conf):
   """Returns a new HTTP client for an HDFS cluster using an HdfsConfig 
object"""

Reply via email to