This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.1.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 39af39f0905652ff524baa1a864f9c17e9c1b196 Author: stiga-huang <[email protected]> AuthorDate: Sun Jul 31 15:47:31 2022 +0800 IMPALA-11464: Skip listing staging dirs to avoid failures on them Hive or other systems will generate staging/tmp dirs under the table/partition folders while loading/inserting data. They are removed when the operation is done. File metadata loading in catalogd could fail if it's listing files of such dirs. This is found on HDFS where file listing is done in batches. Each batch contains a partial list of 1000 items (configured by "dfs.ls.limit"). If the dir is removed, the next listing, e.g. the next hasNext() call on the RemoteIterator, will fail with FileNotFoundException. Such error on staging/tmp dirs should not fail the metadata loading. However, if it happens on a partition dir, the metadata loading should fail to avoid stale metadata. This patch adds a check before listing the dir. If it's a staging/tmp dir, catalogd will just ignore it. Also adds a debug action, catalogd_pause_after_hdfs_remote_iterator_creation, to inject sleeps after the first partial listing (happens in creating the RemoteIterator). So we can reproduce the FileNotFoundException stably. Tests: - Add test on removing a large staging dir (contains 1024 files) during REFRESH. Metadata loading fails consistently before this fix. - Add test on removing a large partition dir (contains 1024 files) during REFRESH. Verify metadata loading fails as expected. Change-Id: Ic848e6c8563a1e0bf294cd50167dfc40f66a56cb Reviewed-on: http://gerrit.cloudera.org:8080/18801 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/18915 Tested-by: Quanlong Huang <[email protected]> Reviewed-by: Csaba Ringhofer <[email protected]> --- .../org/apache/impala/common/FileSystemUtil.java | 21 +++- .../java/org/apache/impala/util/DebugUtils.java | 4 + tests/metadata/test_recursive_listing.py | 116 +++++++++++++++++++-- tests/util/filesystem_base.py | 7 ++ tests/util/hdfs_util.py | 12 +++ 5 files changed, 150 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index e5439d6b7..ac2dbaf37 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -724,7 +724,7 @@ public class FileSystemUtil { return listFiles(fs, p, true, debugAction); } DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY); - return new FilterIterator(p, new RecursingIterator<>(fs, p, + return new FilterIterator(p, new RecursingIterator<>(fs, p, debugAction, FileSystemUtil::listStatusIterator)); } DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY); @@ -748,7 +748,7 @@ public class FileSystemUtil { if (hasRecursiveListFiles(fs)) { baseIterator = fs.listFiles(p, recursive); } else { - baseIterator = new RecursingIterator<>(fs, p, + baseIterator = new RecursingIterator<>(fs, p, debugAction, FileSystemUtil::listLocatedStatusIterator); } return new FilterIterator(p, baseIterator); @@ -926,17 +926,22 @@ public class FileSystemUtil { private final BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> newIterFunc_; private final FileSystem fs_; + private final String debugAction_; private final Stack<RemoteIterator<T>> iters_ = new Stack<>(); private RemoteIterator<T> curIter_; private T curFile_; - private RecursingIterator(FileSystem fs, Path startPath, + private RecursingIterator(FileSystem fs, Path startPath, String debugAction, BiFunctionWithException<FileSystem, Path, RemoteIterator<T>> newIterFunc) throws IOException { this.fs_ = Preconditions.checkNotNull(fs); + this.debugAction_ = debugAction; this.newIterFunc_ = Preconditions.checkNotNull(newIterFunc); Preconditions.checkNotNull(startPath); curIter_ = newIterFunc.apply(fs, startPath); + LOG.trace("listed start path: {}", startPath); + DebugUtils.executeDebugAction(debugAction, + DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION); } @Override @@ -978,6 +983,13 @@ public class FileSystemUtil { * @throws IOException if any IO error occurs */ private void handleFileStat(T fileStatus) throws IOException { + LOG.trace("handleFileStat: {}", fileStatus.getPath()); + if (isIgnoredDir(fileStatus.getPath())) { + LOG.debug("Ignoring {} since it is either a hidden directory or a temporary " + + "staging directory", fileStatus.getPath()); + curFile_ = null; + return; + } if (fileStatus.isFile()) { curFile_ = fileStatus; return; @@ -987,6 +999,9 @@ public class FileSystemUtil { iters_.push(curIter_); curIter_ = subIter; curFile_ = fileStatus; + LOG.trace("listed sub dir: {}", fileStatus.getPath()); + DebugUtils.executeDebugAction(debugAction_, + DebugUtils.REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION); } @Override diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 3c48940b3..f1587f7a4 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -40,6 +40,10 @@ public class DebugUtils { public static final String REFRESH_HDFS_LISTING_DELAY = "catalogd_refresh_hdfs_listing_delay"; + // debug action label for introducing pauses after creating HDFS RemoteIterators. + public static final String REFRESH_PAUSE_AFTER_HDFS_REMOTE_ITERATOR_CREATION + = "catalogd_pause_after_hdfs_remote_iterator_creation"; + // debug action label for introducing delay in alter table recover partitions command. public static final String RECOVER_PARTITIONS_DELAY = "catalogd_table_recover_delay"; diff --git a/tests/metadata/test_recursive_listing.py b/tests/metadata/test_recursive_listing.py index ab7abe8f0..d2d50f261 100644 --- a/tests/metadata/test_recursive_listing.py +++ b/tests/metadata/test_recursive_listing.py @@ -9,9 +9,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from tests.common.impala_test_suite import ImpalaTestSuite + +import pytest +import requests +import time + +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.test_dimensions import create_uncompressed_text_dimension -from tests.common.skip import SkipIfLocal +from tests.common.skip import (SkipIfLocal, SkipIfS3, SkipIfGCS, SkipIfCOS, + SkipIfADLS) from tests.util.filesystem_utils import WAREHOUSE @@ -21,6 +28,10 @@ class TestRecursiveListing(ImpalaTestSuite): This class tests that files are recursively listed within directories and partitions, and that REFRESH picks up changes within them. """ + enable_fs_tracing_url = "http://localhost:25020/set_java_loglevel?" \ + "class=org.apache.impala.common.FileSystemUtil&level=trace" + reset_log_level_url = "http://localhost:25020/reset_java_loglevel" + @classmethod def get_workload(self): return 'functional-query' @@ -44,13 +55,13 @@ class TestRecursiveListing(ImpalaTestSuite): result = self.client.execute("select * from {0}".format(table)) return result.data - def test_unpartitioned(self, vector, unique_database): - self._do_test(vector, unique_database, partitioned=False) + def test_unpartitioned(self, unique_database): + self._do_test(unique_database, partitioned=False) - def test_partitioned(self, vector, unique_database): - self._do_test(vector, unique_database, partitioned=True) + def test_partitioned(self, unique_database): + self._do_test(unique_database, partitioned=True) - def _do_test(self, vector, unique_database, partitioned): + def _init_test_table(self, unique_database, partitioned): tbl_name = "t" fq_tbl_name = unique_database + "." + tbl_name tbl_path = '%s/%s.db/%s' % (WAREHOUSE, unique_database, tbl_name) @@ -70,6 +81,11 @@ class TestRecursiveListing(ImpalaTestSuite): else: part_path = tbl_path + return fq_tbl_name, part_path + + def _do_test(self, unique_database, partitioned): + fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned) + # Add a file inside a nested directory and refresh. self.filesystem_client.make_dir("{0}/dir1".format(part_path[1:])) self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path[1:]), @@ -125,3 +141,89 @@ class TestRecursiveListing(ImpalaTestSuite): self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name)) assert len(self._show_files(fq_tbl_name)) == 1 assert len(self._get_rows(fq_tbl_name)) == 1 + + @SkipIfS3.variable_listing_times + @SkipIfCOS.variable_listing_times + @SkipIfGCS.variable_listing_times + @SkipIfADLS.eventually_consistent + @pytest.mark.execute_serially + @pytest.mark.stress + def test_large_staging_dirs(self, unique_database): + """Regression test for IMPALA-11464: + Test REFRESH survives with concurrent add/remove ops on large staging/tmp dirs + which contain more than 1000 files. Execute this serially since the sleep intervals + might not work with concurrent workloads. Test this only on HDFS since other FS might + not have partial listing (configured by dfs.ls.limit).""" + fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True) + staging_dir = "{0}/.hive-staging".format(part_path) + # Expected timeline (before the fix of IMPALA-11464): + # 0ms: catalogd list the partition dir and wait 200ms. + # 200ms: catalogd get the staging dir and list it partially, then wait 200ms. + # 300ms: remove the staging dir. + # 400ms: catalogd consume the partial list (1000 items). Start listing the remaining + # items and get FileNotFoundException due to the dir is removed. + # After the fix of IMPALA-11464, catalogd won't list the staging dir, so avoids + # hitting the exception. + self._test_listing_large_dir(fq_tbl_name, large_dir=staging_dir, + pause_ms_after_remote_iterator_creation=200, + pause_ms_before_file_cleanup=300, + refresh_should_fail=False) + + @SkipIfS3.variable_listing_times + @SkipIfCOS.variable_listing_times + @SkipIfGCS.variable_listing_times + @SkipIfADLS.eventually_consistent + @pytest.mark.execute_serially + @pytest.mark.stress + def test_partition_dir_removed_inflight(self, unique_database): + """Test REFRESH with concurrent add/remove ops on large partition dirs + which contain more than 1000 files. Execute this serially since the sleep + intervals might not work with concurrent workloads. Test this only on HDFS + since other FS might not have partial listing (configured by dfs.ls.limit)""" + fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True) + # Expected timeline: + # 0ms: catalogd start listing the partition dir. Get 1000 items in the first + # partial listing. Then wait for 300ms. + # 200ms: The partition dir is removed. + # 300ms: catalogd processed the 1000 items and start listing the remaining items. + # Then get FileNotFoundException since the partition dir disappears. + self._test_listing_large_dir(fq_tbl_name, large_dir=part_path + '/', + pause_ms_after_remote_iterator_creation=300, + pause_ms_before_file_cleanup=200, + refresh_should_fail=True) + + def _test_listing_large_dir(self, fq_tbl_name, large_dir, + pause_ms_after_remote_iterator_creation, + pause_ms_before_file_cleanup, + refresh_should_fail): + # We need data files more than 1000 (default of dfs.ls.limit) so the initial + # file listing can't list all of them. + files = [large_dir + '/' + str(i) for i in range(1024)] + refresh_stmt = "refresh " + fq_tbl_name + self.client.set_configuration({ + "debug_action": "catalogd_pause_after_hdfs_remote_iterator_creation:SLEEP@" + + str(pause_ms_after_remote_iterator_creation) + }) + # Enable TRACE logging in FileSystemUtil for better debugging + response = requests.get(self.enable_fs_tracing_url) + assert response.status_code == requests.codes.ok + try: + # self.filesystem_client is a DelegatingHdfsClient. It delegates delete_file_dir() + # and make_dir() to the underlying PyWebHdfsClient which expects the HDFS path + # without a leading '/'. So we use large_dir[1:] to remove the leading '/'. + self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True) + self.filesystem_client.make_dir(large_dir[1:]) + self.filesystem_client.touch(files) + LOG.info("created staging files under " + large_dir) + handle = self.execute_query_async(refresh_stmt) + # Wait a moment to let REFRESH finish expected partial listing on the dir. + time.sleep(pause_ms_before_file_cleanup / 1000.0) + self.filesystem_client.delete_file_dir(large_dir[1:], recursive=True) + LOG.info("removed staging dir " + large_dir) + try: + self.client.fetch(refresh_stmt, handle) + assert not refresh_should_fail, "REFRESH should fail" + except ImpalaBeeswaxException as e: + assert refresh_should_fail, "unexpected exception " + str(e) + finally: + requests.get(self.reset_log_level_url) diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py index f74453016..037eefccf 100644 --- a/tests/util/filesystem_base.py +++ b/tests/util/filesystem_base.py @@ -69,3 +69,10 @@ class BaseFilesystem(object): """Returns a list of integers which are all the file sizes of files found under 'path'.""" pass + + @abstractmethod + def touch(self, paths): + """Updates the access and modification times of the files specified by 'paths' to + the current time. If the files don't exist, zero length files will be created with + current time as the timestamp of them.""" + pass diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py index 81d07e650..50004d663 100644 --- a/tests/util/hdfs_util.py +++ b/tests/util/hdfs_util.py @@ -108,6 +108,8 @@ class DelegatingHdfsClient(BaseFilesystem): def getacl(self, path): return self.webhdfs_client.getacl(path) + def touch(self, paths): + return self.hdfs_filesystem_client.touch(paths) class PyWebHdfsClientWithChmod(PyWebHdfsClient): def chmod(self, path, permission): @@ -309,6 +311,16 @@ class HadoopFsCommandLineClient(BaseFilesystem): missing.""" return path if path.startswith('/') else '/' + path + def touch(self, paths): + """Updates the access and modification times of the files specified by 'paths' to + the current time. If the files don't exist, zero length files will be created with + current time as the timestamp of them.""" + if isinstance(paths, list): + cmd = ['-touch'] + paths + else: + cmd = ['-touch', paths] + (status, stdout, stderr) = self._hadoop_fs_shell(cmd) + return status == 0 def get_webhdfs_client_from_conf(conf): """Returns a new HTTP client for an HDFS cluster using an HdfsConfig object"""
