This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 67fe0c9c6 IMPALA-11736: Copy data between ofs buckets
67fe0c9c6 is described below
commit 67fe0c9c6f220582ee3ceb1ab78d692c2e87f526
Author: Michael Smith <[email protected]>
AuthorDate: Mon Nov 21 13:57:13 2022 -0800
IMPALA-11736: Copy data between ofs buckets
When Impala moves a file - such as for a LOAD DATA statement - it checks
whether the source and destination are the same filesystem. If the same,
it uses hdfsRename, otherwise it uses hdfsMove to move between
filesystems.
Ozone's ofs protocol supports referencing multiple buckets by path in
the same filesystem, but does not support rename between them. All other
filesystems Impala supports include the bucket name (if they use that
concept) in the authority. This patch updates the function used to
determine whether two paths are in the same filesystem to also check
that they're in the same bucket as a requirement for hdfsRename.
Testing: ran test suite with Ozone.
Change-Id: Ic61f01672fa605fec0377885b13a1621573e424e
Reviewed-on: http://gerrit.cloudera.org:8080/19262
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/dml-exec-state.cc | 4 +-
be/src/runtime/exec-env.h | 2 +-
be/src/util/hdfs-util-test.cc | 83 ++++++++++++++--------
be/src/util/hdfs-util.cc | 37 +++++++++-
be/src/util/hdfs-util.h | 7 +-
.../org/apache/impala/common/FileSystemUtil.java | 64 +++++++++++------
.../apache/impala/common/FileSystemUtilTest.java | 18 +++++
tests/metadata/test_load.py | 43 ++++++++++-
8 files changed, 199 insertions(+), 59 deletions(-)
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index b3cb806a8..94c0ba5af 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -297,7 +297,9 @@ Status DmlExecState::FinalizeHdfsInsert(const
TFinalizeParams& params,
dir_deletion_ops.Add(DELETE, move.first);
} else {
VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
- if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+ // Files can't be renamed across different filesystems (considering both
scheme and
+ // authority) or across different Ozone buckets/volumes.
+ if (FilesystemsAndBucketsMatch(move.first.c_str(), move.second.c_str()))
{
move_ops.Add(RENAME, move.first, move.second);
} else {
move_ops.Add(MOVE, move.first, move.second);
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index de3bcc024..31d90169f 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -244,7 +244,7 @@ class ExecEnv {
friend class DataStreamTest;
// For access to InitHadoopConfig().
- FRIEND_TEST(HdfsUtilTest, CheckFilesystemsMatch);
+ FRIEND_TEST(HdfsUtilTest, CheckFilesystemsAndBucketsMatch);
static ExecEnv* exec_env_;
bool is_fe_tests_ = false;
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
index 748ab236e..220f03530 100644
--- a/be/src/util/hdfs-util-test.cc
+++ b/be/src/util/hdfs-util-test.cc
@@ -26,47 +26,72 @@
namespace impala {
-TEST(HdfsUtilTest, CheckFilesystemsMatch) {
+TEST(HdfsUtilTest, CheckFilesystemsAndBucketsMatch) {
// We do this to retrieve the default FS from the frontend without starting
the rest
// of the ExecEnv services.
ExecEnv exec_env;
ASSERT_OK(exec_env.InitHadoopConfig());
// Tests with both paths qualified.
- EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "s3a://dummybucket/temp_dir_2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "s3a://dummybucket_2/temp_dir_2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
- "hdfs://namenode/temp_dir2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://namenode/temp_dir/temp_path",
- "hdfs://namenode_2/temp_dir2/temp_path_2"));
- EXPECT_TRUE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
- "hdfs://namenode:9999/temp_dir2/temp_path_2"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
- "hdfs://namenode:8888/temp_dir2/temp_path_2"));
- EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file:///path/to/dir/filename.parq"));
- EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file:/path_2/to/dir/filename.parq"));
- EXPECT_TRUE(FilesystemsMatch("file:///path/to/dir/filename.parq",
- "file:/path_2/to/dir/filename.parq"));
- EXPECT_FALSE(FilesystemsMatch("file:/path/to/dir/filename.parq",
- "file2://path/to/dir/filename.parq"));
- EXPECT_FALSE(FilesystemsMatch("hdfs://",
"s3a://dummybucket/temp_dir/temp_path"));
- EXPECT_TRUE(FilesystemsMatch("hdfs://namenode", "hdfs://namenode/"));
+
EXPECT_TRUE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+
"s3a://dummybucket/temp_dir_2/temp_path_2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+
"s3a://dummybucket_2/temp_dir_2/temp_path_2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("s3a://dummybucket/temp_dir/temp_path",
+
"hdfs://namenode/temp_dir2/temp_path_2"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://namenode/temp_dir/temp_path",
+
"hdfs://namenode_2/temp_dir2/temp_path_2"));
+
EXPECT_TRUE(FilesystemsAndBucketsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+
"hdfs://namenode:9999/temp_dir2/temp_path_2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+
"hdfs://namenode:8888/temp_dir2/temp_path_2"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+ "file:///path/to/dir/filename.parq"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+ "file:/path_2/to/dir/filename.parq"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("file:///path/to/dir/filename.parq",
+ "file:/path_2/to/dir/filename.parq"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("file:/path/to/dir/filename.parq",
+
"file2://path/to/dir/filename.parq"));
+ EXPECT_FALSE(FilesystemsAndBucketsMatch("hdfs://",
+
"s3a://dummybucket/temp_dir/temp_path"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch("hdfs://namenode",
"hdfs://namenode/"));
+
EXPECT_TRUE(FilesystemsAndBucketsMatch("o3fs://bucket.volume.namenode:9862/path1",
+
"o3fs://bucket.volume.namenode:9862/path2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("o3fs://bucket1.volume.namenode:9862/path1",
+
"o3fs://bucket2.volume.namenode:9862/path2"));
+
+ // Provide unqualified paths for testing. With ofs, unqualified paths still
include the
+ // volume and bucket name which causes tests to fail. Add a prefix so they
pass.
+ std::string relpath1 = "tempdir/temppath";
+ std::string relpath2 = "tempdir2/temppath2";
+ std::string default_fs = exec_env.default_fs();
+ if (default_fs.rfind(FILESYS_PREFIX_OFS, 0) == 0) {
+ relpath1 = "volume/bucket/" + relpath1;
+ relpath2 = "volume/bucket/" + relpath2;
+ default_fs += "/volume/bucket";
+ }
// Tests with both paths paths unqualified.
- EXPECT_TRUE(FilesystemsMatch("tempdir/temppath", "tempdir2/temppath2"));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(relpath1.c_str(), relpath2.c_str()));
// Tests with one path qualified and the other unqualified.
- const char* default_fs = exec_env.default_fs().c_str();
- EXPECT_TRUE(FilesystemsMatch(default_fs, "temp_dir/temp_path"));
- EXPECT_TRUE(FilesystemsMatch("temp_dir/temp_path", default_fs));
- EXPECT_FALSE(FilesystemsMatch("badscheme://namenode/temp_dir/temp_path",
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(default_fs.c_str(),
relpath1.c_str()));
+ EXPECT_TRUE(FilesystemsAndBucketsMatch(relpath1.c_str(),
default_fs.c_str()));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("badscheme://namenode/temp_dir/temp_path",
"temp_dir/temp_path"));
- EXPECT_FALSE(FilesystemsMatch("badscheme://namenode:1234/temp_dir/temp_path",
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("badscheme://namenode:1234/temp_dir/temp_path",
"temp_dir/temp_path"));
+
+ // Tests for ofs with volume/bucket
+
EXPECT_TRUE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume/bucket/path1",
+
"ofs://namenode:9862/volume/bucket/path2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume/bucket1/path1",
+
"ofs://namenode:9862/volume/bucket2/path2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume1/bucket/path1",
+
"ofs://namenode:9862/volume2/bucket/path2"));
+
EXPECT_FALSE(FilesystemsAndBucketsMatch("ofs://namenode:9862/volume1/bucket1/path1",
+
"ofs://namenode:9862/volume2/bucket2/path2"));
}
TEST(HdfsUtilTest, CheckGetBaseName) {
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 1d6db5f29..46ab67af1 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -86,7 +86,7 @@ Status CopyHdfsFile(const hdfsFS& src_conn, const string&
src_path,
return Status::OK();
}
-bool IsSpecificPath(
+static bool IsSpecificPath(
const char* path, const char* specific_prefix, bool check_default_fs) {
size_t prefix_len = strlen(specific_prefix);
if (check_default_fs && strstr(path, ":/") == NULL) {
@@ -154,7 +154,7 @@ static int GetFilesystemNameLength(const char* path) {
return after_authority - path;
}
-bool FilesystemsMatch(const char* path_a, const char* path_b) {
+static bool FilesystemsMatch(const char* path_a, const char* path_b) {
int fs_a_name_length = GetFilesystemNameLength(path_a);
int fs_b_name_length = GetFilesystemNameLength(path_b);
@@ -179,6 +179,39 @@ bool FilesystemsMatch(const char* path_a, const char*
path_b) {
return strncmp(path_a, path_b, fs_a_name_length) == 0;
}
+static int VolumeBucketLength(const char* path) {
+ if (*path == '\0') return 0;
+ const char* afterVolume = strstr(path, "/");
+ if (afterVolume == nullptr) return strlen(path);
+ const char* afterBucket = strstr(afterVolume + 1, "/");
+ if (afterBucket == nullptr) return strlen(path);
+ return afterBucket - path;
+}
+
+static bool OfsBucketsMatch(const char* path_a, const char* path_b) {
+ // Examine only the path elements.
+ path_a = path_a + GetFilesystemNameLength(path_a);
+ path_b = path_b + GetFilesystemNameLength(path_b);
+ // Skip past starting slash for comparison to unqualified paths.
+ if (*path_a == '/') ++path_a;
+ if (*path_b == '/') ++path_b;
+
+ int vba_len = VolumeBucketLength(path_a);
+ int vbb_len = VolumeBucketLength(path_b);
+ if (vba_len != vbb_len) return false;
+ return strncmp(path_a, path_b, vba_len) == 0;
+}
+
+bool FilesystemsAndBucketsMatch(const char* path_a, const char* path_b) {
+ if (!FilesystemsMatch(path_a, path_b)) return false;
+
+ // path_a and path_b are in the same filesystem, so we just need to check
one prefix.
+ if (IsSpecificPath(path_a, FILESYS_PREFIX_OFS, true)) {
+ return OfsBucketsMatch(path_a, path_b);
+ }
+ return true;
+}
+
string GetBaseName(const char* path) {
int fs_name_length = GetFilesystemNameLength(path);
if (fs_name_length >= strlen(path)) return ".";
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index a31a76d73..cb9629a50 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -33,6 +33,7 @@ extern const char* FILESYS_PREFIX_ADL;
extern const char* FILESYS_PREFIX_GCS;
extern const char* FILESYS_PREFIX_COS;
extern const char* FILESYS_PREFIX_OZONE;
+extern const char* FILESYS_PREFIX_OFS;
/// Utility function to get error messages from HDFS. This function takes
prefix/file and
/// appends errno to it. Note: any stdlib function can reset errno, this
should be called
@@ -83,8 +84,10 @@ bool IsOzonePath(const char* path, bool check_default_fs =
true);
/// Returns true iff the path refers to a location on an SFS filesystem.
bool IsSFSPath(const char* path, bool check_default_fs = true);
-/// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
-bool FilesystemsMatch(const char* pathA, const char* pathB);
+/// Returns true iff 'pathA' and 'pathB' are on the same filesystem and bucket.
+/// Most filesystems embed bucket in the authority, but Ozone's ofs protocol
allows
+/// addressing volume/bucket via the path and does not allow renames across
them.
+bool FilesystemsAndBucketsMatch(const char* pathA, const char* pathB);
/// Returns the terminal component of 'path'.
/// E.g. if 'path' is "hdfs://localhost:8020/a/b/c", "c" is returned.
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 2eca3b76f..9b95fd620 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -247,6 +247,32 @@ public class FileSystemUtil {
return numFilesMoved;
}
+ // Returns the first two elements (volume, bucket) of the unqualified path.
+ public static String volumeBucketSubstring(Path p) {
+ String path = Path.getPathWithoutSchemeAndAuthority(p).toString();
+ if (path.startsWith("/")) path = path.substring(1);
+ int afterVolume = path.indexOf('/');
+ if (afterVolume == -1) return path;
+ int afterBucket = path.indexOf('/', afterVolume + 1);
+ if (afterBucket == -1) return path;
+ return path.substring(0, afterBucket);
+ }
+
+ /*
+ * Returns true if the source and path are in the same bucket. Ozone's ofs
encodes
+ * volume/bucket into the path. All other filesystems make it part of the
authority
+ * portion of the URI.
+ */
+ public static boolean isSameBucket(Path source, Path dest) throws
IOException {
+ if (!isPathOnFileSystem(source, dest.getFileSystem(CONF))) return false;
+
+ // Return true for anything besides OFS.
+ if (!hasScheme(source, SCHEME_OFS)) return true;
+
+ // Compare (volume, bucket) for source and dest.
+ return volumeBucketSubstring(source).equals(volumeBucketSubstring(dest));
+ }
+
/**
* Relocates the given file to a new location (either another directory or a
* file in the same or different filesystem). The file is generally moved
(renamed) to
@@ -261,7 +287,6 @@ public class FileSystemUtil {
public static void relocateFile(Path sourceFile, Path dest,
boolean renameIfAlreadyExists) throws IOException {
FileSystem destFs = dest.getFileSystem(CONF);
- FileSystem sourceFs = sourceFile.getFileSystem(CONF);
Path destFile =
destFs.isDirectory(dest) ? new Path(dest, sourceFile.getName()) : dest;
@@ -272,7 +297,7 @@ public class FileSystemUtil {
destFile = new Path(destDir,
appendToBaseFileName(destFile.getName(),
UUID.randomUUID().toString()));
}
- boolean sameFileSystem = isPathOnFileSystem(sourceFile, destFs);
+ boolean sameBucket = isSameBucket(sourceFile, dest);
boolean destIsDfs = isDistributedFileSystem(destFs);
// If the source and the destination are on different file systems, or in
different
@@ -282,15 +307,12 @@ public class FileSystemUtil {
arePathsInSameHdfsEncryptionZone(destFs, sourceFile, destFile);
// We can do a rename if the src and dst are in the same encryption zone
in the same
// distributed filesystem.
- boolean doRename = destIsDfs && sameFileSystem && sameEncryptionZone;
+ boolean doRename = destIsDfs && sameBucket && sameEncryptionZone;
// Alternatively, we can do a rename if the src and dst are on the same
- // non-distributed filesystem.
- if (!doRename) doRename = !destIsDfs && sameFileSystem;
+ // non-distributed filesystem in the same bucket (if it has that concept).
+ if (!doRename) doRename = !destIsDfs && sameBucket;
if (doRename) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format(
- "Moving '%s' to '%s'", sourceFile.toString(),
destFile.toString()));
- }
+ LOG.trace("Moving '{}' to '{}'", sourceFile, destFile);
// Move (rename) the file.
if (!destFs.rename(sourceFile, destFile)) {
throw new IOException(String.format(
@@ -298,24 +320,20 @@ public class FileSystemUtil {
}
return;
}
- if (destIsDfs && sameFileSystem) {
- Preconditions.checkState(!doRename);
- // We must copy rather than move if the source and dest are in different
- // encryption zones. A move would return an error from the NN because a
move is a
+ Preconditions.checkState(!doRename);
+ if (destIsDfs && sameBucket) {
+ Preconditions.checkState(!sameEncryptionZone);
+ // We must copy rather than move if the source and dest are in different
encryption
+ // zones or buckets. A move would return an error from the NN because a
move is a
// metadata-only operation and the files would not be
encrypted/decrypted properly
// on the DNs.
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format(
- "Copying source '%s' to '%s' because HDFS encryption zones are
different.",
- sourceFile, destFile));
- }
+ LOG.trace(
+ "Copying source '{}' to '{}' because HDFS encryption zones are
different.",
+ sourceFile, destFile);
} else {
- Preconditions.checkState(!sameFileSystem);
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Copying '%s' to '%s' between filesystems.",
- sourceFile, destFile));
- }
+ LOG.trace("Copying '{}' to '{}' between filesystems.", sourceFile,
destFile);
}
+ FileSystem sourceFs = sourceFile.getFileSystem(CONF);
FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF);
}
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index e3b167ba8..b73bfef42 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -17,6 +17,7 @@
package org.apache.impala.common;
+import org.apache.impala.common.Pair;
import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
import static org.apache.impala.common.FileSystemUtil.SPARK_TEMP_FILE_PREFIX;
import static org.apache.impala.common.FileSystemUtil.isIgnoredDir;
@@ -212,6 +213,23 @@ public class FileSystemUtilTest {
}
}
+ @Test
+ public void testVolumeBucketSubstring() throws IOException {
+ List<Pair<String, String>> cases = Arrays.asList(
+ Pair.create(mockLocation(FileSystemUtil.SCHEME_OFS), "volume1/bucket2"),
+ Pair.create("ofs://svc1:9876/volume/bucket/file", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/bucket/", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/bucket", "volume/bucket"),
+ Pair.create("ofs://svc1:9876/volume/", "volume"),
+ Pair.create("ofs://svc1:9876/volume", "volume"),
+ Pair.create("ofs://svc1:9876/", "")
+ );
+ for (Pair<String, String> c : cases) {
+ Path p = new Path(c.first);
+ assertEquals(c.second, FileSystemUtil.volumeBucketSubstring(p));
+ }
+ }
+
private boolean testIsInIgnoredDirectory(Path input) {
return testIsInIgnoredDirectory(input, true);
}
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index c9e537215..316d3b2d5 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -28,7 +28,7 @@ from tests.common.test_dimensions import (
create_uncompressed_text_dimension)
from tests.common.skip import SkipIfLocal
from tests.common.test_vector import ImpalaTestDimension
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
TEST_TBL_PART = "test_load"
TEST_TBL_NOPART = "test_load_nopart"
@@ -37,6 +37,8 @@ ALLTYPES_PATH = "%s/alltypes/year=2010/month=1/100101.txt" %
WAREHOUSE
MULTIAGG_PATH = '%s/alltypesaggmultifiles/year=2010/month=1/day=1' % WAREHOUSE
HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH),
"{0}/3/_100101.txt".format(STAGING_PATH)]
+# A path outside WAREHOUSE, which will be a different bucket for Ozone/ofs.
+TMP_STAGING_PATH = get_fs_path('/tmp/test_load_staging')
@SkipIfLocal.hdfs_client
class TestLoadData(ImpalaTestSuite):
@@ -107,6 +109,45 @@ class TestLoadData(ImpalaTestSuite):
assert self.filesystem_client.exists(file_), "{0} does not
exist".format(file_)
[email protected]_client
+class TestLoadDataExternal(ImpalaTestSuite):
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestLoadDataExternal, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+ cls.ImpalaTestMatrix.add_dimension(
+ create_uncompressed_text_dimension(cls.get_workload()))
+
+ def _clean_test_tables(self):
+ self.client.execute("drop table if exists
functional.{0}".format(TEST_TBL_NOPART))
+ self.filesystem_client.delete_file_dir(TMP_STAGING_PATH, recursive=True)
+
+ def teardown_method(self, method):
+ self._clean_test_tables()
+
+ def setup_method(self, method):
+ # Defensively clean the data dirs if they exist.
+ self._clean_test_tables()
+
+ self.filesystem_client.make_dir(TMP_STAGING_PATH)
+ self.filesystem_client.copy(ALLTYPES_PATH,
"{0}/100101.txt".format(TMP_STAGING_PATH))
+
+ self.client.execute("create table functional.{0} like
functional.alltypesnopart"
+ " location '{1}/{0}'".format(TEST_TBL_NOPART, WAREHOUSE))
+
+ def test_load(self, vector):
+ self.execute_query_expect_success(self.client, "load data inpath
'{0}/100101.txt'"
+ " into table functional.{1}".format(TMP_STAGING_PATH, TEST_TBL_NOPART))
+ result = self.execute_scalar(
+ "select count(*) from functional.{0}".format(TEST_TBL_NOPART))
+ assert(result == '310')
+
+
@SkipIfLocal.hdfs_client
class TestAsyncLoadData(ImpalaTestSuite):