This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new bbb0b4939 IMPALA-11476: Support Ozone erasure coding
bbb0b4939 is described below
commit bbb0b4939dc8791b9b505f7bff46ce411b31ae6d
Author: Michael Smith <[email protected]>
AuthorDate: Tue Sep 6 11:34:09 2022 -0700
IMPALA-11476: Support Ozone erasure coding
Adds support for identifying erasure coding policy with Ozone. Enables
testing Ozone with erasure coding.
Omits support for identifying erasure coding policy with the o3fs
protocol as that protocol is effectively deprecated and its classes
don't provide access to the ObjectStore.
Refactors volumeBucketPair to use StringTokenizer.
Test updates:
- test_exclusive_coordinator_plan: Ozone+EC blocks are 768MB, which is
larger than all tables in our test environment. Use tpch_parquet which
we rely on having 3 files (by loading from snapshot in this case).
- test_new_file_shorter: receives an EOFException when seeking with EC
- test_local_read: erasure-coded-bytes-read is also tied to IMPALA-11697
- test_erasure_coding: Ozone doesn't report files as erasure-coded
(HDDS-7603)
Testing:
- Passes core E2E and custom cluster tests with TARGET_FILESYSTEM=ozone
and ERASURE_CODING=true.
Change-Id: I201e2e33ce94bbc1e81631a0a315884bcc8047d1
Reviewed-on: http://gerrit.cloudera.org:8080/19324
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
bin/impala-config.sh | 6 ++
.../org/apache/impala/common/FileSystemUtil.java | 71 +++++++++++++++++++---
.../apache/impala/common/FileSystemUtilTest.java | 23 +++----
testdata/cluster/admin | 12 +++-
.../common/etc/hadoop/conf/ozone-site.xml.py | 7 ++-
tests/common/impala_test_suite.py | 2 +-
tests/common/skip.py | 5 +-
tests/custom_cluster/test_coordinators.py | 6 +-
tests/query_test/test_io_metrics.py | 5 +-
tests/query_test/test_scanners.py | 1 +
10 files changed, 107 insertions(+), 31 deletions(-)
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 2d2ce7175..b0a0de7df 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -728,6 +728,7 @@ elif [ "${TARGET_FILESYSTEM}" = "local" ]; then
elif [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
if [[ "${ERASURE_CODING}" = true ]]; then
export HDFS_ERASURECODE_POLICY="RS-3-2-1024k"
+ export ERASURECODE_POLICY="$HDFS_ERASURECODE_POLICY"
export HDFS_ERASURECODE_PATH="/test-warehouse"
fi
elif [ "${TARGET_FILESYSTEM}" = "ozone" ]; then
@@ -736,6 +737,11 @@ elif [ "${TARGET_FILESYSTEM}" = "ozone" ]; then
export DEFAULT_FS="ofs://${INTERNAL_LISTEN_HOST}:9862"
export FILESYSTEM_PREFIX="${DEFAULT_FS}/${OZONE_VOLUME}"
export WAREHOUSE_LOCATION_PREFIX="/${OZONE_VOLUME}"
+ if [[ "${ERASURE_CODING}" = true ]]; then
+ export OZONE_ERASURECODE_POLICY="RS-3-2-1024k"
+ # Ozone normalizes the policy for internal storage. Use this string for
tests.
+ export ERASURECODE_POLICY="rs-3-2-1048576"
+ fi
else
echo "Unsupported filesystem '$TARGET_FILESYSTEM'"
echo "Valid values are: hdfs, isilon, s3, abfs, adls, gs, local, ozone"
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 88401d203..5ed8b4cae 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -32,11 +32,17 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.adl.AdlFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
+import org.apache.hadoop.fs.ozone.BasicRootedOzoneClientAdapterImpl;
+import org.apache.hadoop.fs.ozone.BasicRootedOzoneFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.impala.catalog.HdfsCompression;
+import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.util.DebugUtils;
import org.slf4j.Logger;
@@ -52,8 +58,11 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Stack;
+import java.util.StringTokenizer;
import java.util.UUID;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+
/**
* Common utility functions for operating on FileSystem objects.
*/
@@ -194,19 +203,60 @@ public class FileSystemUtil {
return z1.equals(z2);
}
+ /**
+ * Returns a string representation of the Ozone replication type for a given
path.
+ */
+ private static String getOzoneReplication(ObjectStore os, Path p) throws
IOException {
+ String path = Path.getPathWithoutSchemeAndAuthority(p).toString();
+ StringTokenizer tokens = new StringTokenizer(path, OZONE_URI_DELIMITER);
+
+ if (!tokens.hasMoreTokens()) return null;
+
+ OzoneVolume volume = os.getVolume(tokens.nextToken());
+ if (!tokens.hasMoreTokens()) return null;
+ OzoneBucket bucket = volume.getBucket(tokens.nextToken());
+ if (!tokens.hasMoreTokens()) {
+ return bucket.getReplicationConfig().getReplication();
+ }
+
+ // Get all remaining text except the leading slash
+ String keyName = tokens.nextToken("").substring(1);
+ return bucket.getKey(keyName).getReplicationConfig().getReplication();
+ }
+
/**
* Returns the erasure coding policy for the path, or NONE if not set.
*/
public static String getErasureCodingPolicy(Path p) {
if (isDistributedFileSystem(p)) {
try {
- ErasureCodingPolicy policy =
getDistributedFileSystem().getErasureCodingPolicy(p);
+ DistributedFileSystem dfs = (DistributedFileSystem)
p.getFileSystem(CONF);
+ ErasureCodingPolicy policy = dfs.getErasureCodingPolicy(p);
if (policy != null) {
return policy.getName();
}
} catch (IOException e) {
LOG.warn("Unable to retrieve erasure coding policy for {}", p, e);
}
+ } else if (isOzoneFileSystem(p)) {
+ try {
+ FileSystem fs = p.getFileSystem(CONF);
+ if (fs instanceof BasicRootedOzoneFileSystem) {
+ BasicRootedOzoneFileSystem ofs = (BasicRootedOzoneFileSystem) fs;
+ Preconditions.checkState(
+ ofs.getAdapter() instanceof BasicRootedOzoneClientAdapterImpl);
+ BasicRootedOzoneClientAdapterImpl adapter =
+ (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
+ String replication = getOzoneReplication(adapter.getObjectStore(),
p);
+ if (replication != null) {
+ return replication;
+ }
+ } else {
+ LOG.debug("Retrieving erasure code policy not supported for {}", p);
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to retrieve erasure coding policy for {}", p, e);
+ }
}
return NO_ERASURE_CODE_LABEL;
}
@@ -268,14 +318,17 @@ public class FileSystemUtil {
}
// Returns the first two elements (volume, bucket) of the unqualified path.
- public static String volumeBucketSubstring(Path p) {
+ public static Pair<String, String> volumeBucketPair(Path p) {
String path = Path.getPathWithoutSchemeAndAuthority(p).toString();
- if (path.startsWith("/")) path = path.substring(1);
- int afterVolume = path.indexOf('/');
- if (afterVolume == -1) return path;
- int afterBucket = path.indexOf('/', afterVolume + 1);
- if (afterBucket == -1) return path;
- return path.substring(0, afterBucket);
+ StringTokenizer tokens = new StringTokenizer(path, OZONE_URI_DELIMITER);
+ String volume = "", bucket = "";
+ if (tokens.hasMoreTokens()) {
+ volume = tokens.nextToken();
+ if (tokens.hasMoreTokens()) {
+ bucket = tokens.nextToken();
+ }
+ }
+ return Pair.create(volume, bucket);
}
/*
@@ -290,7 +343,7 @@ public class FileSystemUtil {
if (!hasScheme(source, SCHEME_OFS)) return true;
// Compare (volume, bucket) for source and dest.
- return volumeBucketSubstring(source).equals(volumeBucketSubstring(dest));
+ return volumeBucketPair(source).equals(volumeBucketPair(dest));
}
/**
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index b73bfef42..652e0ec79 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -214,19 +214,20 @@ public class FileSystemUtilTest {
}
@Test
- public void testVolumeBucketSubstring() throws IOException {
- List<Pair<String, String>> cases = Arrays.asList(
- Pair.create(mockLocation(FileSystemUtil.SCHEME_OFS), "volume1/bucket2"),
- Pair.create("ofs://svc1:9876/volume/bucket/file", "volume/bucket"),
- Pair.create("ofs://svc1:9876/volume/bucket/", "volume/bucket"),
- Pair.create("ofs://svc1:9876/volume/bucket", "volume/bucket"),
- Pair.create("ofs://svc1:9876/volume/", "volume"),
- Pair.create("ofs://svc1:9876/volume", "volume"),
- Pair.create("ofs://svc1:9876/", "")
+ public void testVolumeBucketPair() throws IOException {
+ List<Pair<String, Pair<String, String>>> cases = Arrays.asList(
+ Pair.create(mockLocation(FileSystemUtil.SCHEME_OFS),
+ Pair.create("volume1", "bucket2")),
+ Pair.create("ofs://svc1:9876/volume/bucket/file", Pair.create("volume",
"bucket")),
+ Pair.create("ofs://svc1:9876/volume/bucket/", Pair.create("volume",
"bucket")),
+ Pair.create("ofs://svc1:9876/volume/bucket", Pair.create("volume",
"bucket")),
+ Pair.create("ofs://svc1:9876/volume/", Pair.create("volume", "")),
+ Pair.create("ofs://svc1:9876/volume", Pair.create("volume", "")),
+ Pair.create("ofs://svc1:9876/", Pair.create("", ""))
);
- for (Pair<String, String> c : cases) {
+ for (Pair<String, Pair<String, String>> c : cases) {
Path p = new Path(c.first);
- assertEquals(c.second, FileSystemUtil.volumeBucketSubstring(p));
+ assertEquals(c.second, FileSystemUtil.volumeBucketPair(p));
}
}
diff --git a/testdata/cluster/admin b/testdata/cluster/admin
index 0fa78d1f4..9f58d11d5 100755
--- a/testdata/cluster/admin
+++ b/testdata/cluster/admin
@@ -41,7 +41,7 @@ shift $(($OPTIND-1))
DIR=$(dirname $0)
NODE_COUNT=3
-if [[ "$TARGET_FILESYSTEM" == "hdfs" && "$ERASURE_CODING" = true ]]; then
+if [[ "$ERASURE_CODING" = true ]]; then
NODE_COUNT=5
fi
NODE_PREFIX=node-
@@ -327,6 +327,13 @@ function start_cluster {
fi
if [[ "${TARGET_FILESYSTEM}" = "ozone" ]]; then
+ local replication=''
+ echo "Creating Ozone volume/bucket"
+ if [ -n "${OZONE_ERASURECODE_POLICY}" ]; then
+ replication="--type EC --replication ${OZONE_ERASURECODE_POLICY}"
+ echo "with erasure coding ${OZONE_ERASURECODE_POLICY}"
+ fi
+
local bucketkey=''
if $USE_OZONE_ENCRYPTION; then
echo "Ozone encryption enabled for ${OZONE_VOLUME}/test-warehouse"
@@ -337,7 +344,8 @@ function start_cluster {
fi
ozone sh volume create ${OZONE_VOLUME} || true
- ozone sh bucket create ${bucketkey} ${OZONE_VOLUME}/test-warehouse || true
+ ozone sh bucket create ${bucketkey} ${replication} \
+ ${OZONE_VOLUME}/test-warehouse || true
fi
return $?
diff --git
a/testdata/cluster/node_templates/common/etc/hadoop/conf/ozone-site.xml.py
b/testdata/cluster/node_templates/common/etc/hadoop/conf/ozone-site.xml.py
index 2da2d9726..71bff9a3b 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/ozone-site.xml.py
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/ozone-site.xml.py
@@ -18,7 +18,6 @@
# under the License.
import os
-import sys
CONFIG = {
# Host/port configs
@@ -41,3 +40,9 @@ CONFIG = {
'ozone.om.ratis.storage.dir': '${NODE_DIR}/data/ozone/om-ratis',
'dfs.container.ratis.datanode.storage.dir': '${NODE_DIR}/data/ozone/ratis',
}
+
+if os.environ.get('OZONE_ERASURECODE_POLICY'):
+ CONFIG.update({
+ 'ozone.server.default.replication.type': 'EC',
+ 'ozone.server.default.replication': os.environ['OZONE_ERASURECODE_POLICY'],
+ })
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index 4e459c796..b88b629b2 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -495,7 +495,7 @@ class ImpalaTestSuite(BaseTestSuite):
"MANAGED_WAREHOUSE_DIR",
"EXTERNAL_WAREHOUSE_DIR"])
repl.update({
- '$ERASURECODE_POLICY': os.getenv("HDFS_ERASURECODE_POLICY", "NONE"),
+ '$ERASURECODE_POLICY': os.getenv("ERASURECODE_POLICY", "NONE"),
'$SECONDARY_FILESYSTEM': os.getenv("SECONDARY_FILESYSTEM", ""),
'$WAREHOUSE_LOCATION_PREFIX': os.getenv("WAREHOUSE_LOCATION_PREFIX",
""),
'$USER': getuser()})
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 85d2c678d..c8737e51a 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -66,7 +66,8 @@ class SkipIfFS:
shutdown_idle_fails = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS,
reason="IMPALA-10562")
late_filters = pytest.mark.skipif(IS_ISILON, reason="IMPALA-6998")
- read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS, reason="IMPALA-2512")
+ read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS or (IS_OZONE and IS_EC),
+ reason="IMPALA-2512")
large_block_size = pytest.mark.skipif(IS_OZONE or IS_EC,
reason="block size is larger than 128MB")
read_speed_dependent = pytest.mark.skipif(not IS_HDFS or IS_EC,
@@ -111,6 +112,8 @@ class SkipIf:
is_buggy_el6_kernel = pytest.mark.skipif(
IS_BUGGY_EL6_KERNEL, reason="Kernel is affected by KUDU-1508")
+ ozone_ec_incomplete = pytest.mark.skipif(IS_OZONE and IS_EC,
reason="HDDS-7603")
+
class SkipIfLocal:
# These are skipped due to product limitations.
hdfs_blocks = pytest.mark.skipif(IS_LOCAL,
diff --git a/tests/custom_cluster/test_coordinators.py
b/tests/custom_cluster/test_coordinators.py
index 2530f68ba..5b816f424 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -282,9 +282,9 @@ class TestCoordinators(CustomClusterTestSuite):
"where id NOT IN (0,1,2) and string_col IN ('aaaa', 'bbbb',
'cccc', NULL) "
"and mod(int_col,50) IN (0,1) and id IN (int_col);").data
assert 'F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2' in result
- # Single partition table
- result = client.execute("explain select * from tpch.lineitem "
- "union all select * from tpch.lineitem").data
+ # Single partition table with 3 blocks
+ result = client.execute("explain select * from tpch_parquet.lineitem "
+ "union all select * from tpch_parquet.lineitem").data
assert 'F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2' in result
finally:
assert client is not None
diff --git a/tests/query_test/test_io_metrics.py
b/tests/query_test/test_io_metrics.py
index f4e4d1ef4..f75d14997 100644
--- a/tests/query_test/test_io_metrics.py
+++ b/tests/query_test/test_io_metrics.py
@@ -47,11 +47,10 @@ class TestIOMetrics(ImpalaTestSuite):
def append_metric(metric, expect_nonzero):
(expect_nonzero_metrics if expect_nonzero else
expect_zero_metrics).append(metric)
- append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC)
+ # IMPALA-11697: these come from getReadStatistics, which is only
implemented for HDFS
+ append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_HDFS and
IS_EC)
append_metric("impala-server.io-mgr.short-circuit-bytes-read",
IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
- # TODO: this should be updated for Ozone, but the code that updates it is
guarded by
- # IsHdfsPath and adding Ozone causes a crash. Plan to debug in
IMPALA-11697.
append_metric("impala-server.io-mgr.local-bytes-read",
IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
diff --git a/tests/query_test/test_scanners.py
b/tests/query_test/test_scanners.py
index b4ab8b15b..5298ab152 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1892,6 +1892,7 @@ class TestErasureCoding(ImpalaTestSuite):
def get_workload(cls):
return 'functional-query'
+ @SkipIf.ozone_ec_incomplete
@SkipIf.not_ec
def test_erasure_coding(self, vector):
self.run_test_case('QueryTest/hdfs-erasure-coding', vector)