This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8935c759042ef14e9138f9bc28adc3cede4ee96f Author: Michael Smith <[email protected]> AuthorDate: Mon Jan 30 12:12:27 2023 -0800 IMPALA-11859: Add bytes-read-encrypted metric Adds a metric bytes-read-encrypted to track encrypted reads. Testing: - ran test_io_metrics.py with Ozone (encrypts by default) - ran test_io_metrics.py with HDFS (no encryption) Change-Id: I9dbc194a4bc31cb0e01545fb6032a0853db60f34 Reviewed-on: http://gerrit.cloudera.org:8080/19461 Reviewed-by: Joe McDonnell <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-scan-node-base.cc | 7 +++++ be/src/exec/hdfs-scan-node-base.h | 7 ++++- be/src/exec/orc/hdfs-orc-scanner.cc | 5 ++-- be/src/runtime/io/hdfs-file-reader.cc | 3 +++ be/src/runtime/io/request-context.h | 4 +++ be/src/runtime/io/request-ranges.h | 7 ++++- be/src/runtime/io/scan-range.cc | 1 + be/src/scheduling/scheduler.cc | 2 ++ be/src/util/impalad-metrics.cc | 5 ++++ be/src/util/impalad-metrics.h | 4 +++ common/fbs/CatalogObjects.fbs | 3 +++ common/protobuf/planner.proto | 3 +++ common/thrift/PlanNodes.thrift | 3 +++ common/thrift/metrics.json | 10 ++++++++ .../org/apache/impala/catalog/FeIcebergTable.java | 5 ++-- .../apache/impala/catalog/FileMetadataLoader.java | 4 +-- .../org/apache/impala/catalog/HdfsPartition.java | 15 ++++++----- .../java/org/apache/impala/compat/HdfsShim.java | 30 ---------------------- .../org/apache/impala/planner/HdfsScanNode.java | 1 + tests/query_test/test_io_metrics.py | 3 ++- 20 files changed, 76 insertions(+), 46 deletions(-) diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 123bfc250..ca3fab885 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -111,6 +111,8 @@ PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES, "The total number of bytes read via short circuit read"); PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES, "The total number of bytes read from data node cache"); +PROFILE_DEFINE_COUNTER(BytesReadEncrypted, STABLE_LOW, TUnit::BYTES, + "The total number of bytes read from encrypted data"); PROFILE_DEFINE_COUNTER(BytesReadErasureCoded, STABLE_LOW, TUnit::BYTES, "The total number of bytes read from erasure-coded data"); PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT, @@ -301,6 +303,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat file_desc->file_length = split.file_length(); file_desc->mtime = split.mtime(); file_desc->file_compression = CompressionTypePBToThrift(split.file_compression()); + file_desc->is_encrypted = split.is_encrypted(); file_desc->is_erasure_coded = split.is_erasure_coded(); file_desc->file_metadata = file_metadata; if (file_metadata) { @@ -629,6 +632,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { bytes_read_short_circuit_ = PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile()); bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile()); + bytes_read_encrypted_ = PROFILE_BytesReadEncrypted.Instantiate(runtime_profile()); bytes_read_ec_ = PROFILE_BytesReadErasureCoded.Instantiate(runtime_profile()); num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile()); unexpected_remote_bytes_ = @@ -1236,6 +1240,7 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { bytes_read_local_->Set(reader_context_->bytes_read_local()); bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit()); bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache()); + bytes_read_encrypted_->Set(reader_context_->bytes_read_encrypted()); bytes_read_ec_->Set(reader_context_->bytes_read_ec()); num_remote_ranges_->Set(reader_context_->num_remote_ranges()); unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes()); @@ -1261,6 +1266,8 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { bytes_read_short_circuit_->value()); ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment( bytes_read_dn_cache_->value()); + ImpaladMetrics::IO_MGR_ENCRYPTED_BYTES_READ->Increment( + bytes_read_encrypted_->value()); ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ->Increment( bytes_read_ec_->value()); } diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 5c03f3e66..56a8157aa 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -72,7 +72,8 @@ struct HdfsFileDesc { file_format(THdfsFileFormat::TEXT) {} io::ScanRange::FileInfo GetFileInfo() const { - return io::ScanRange::FileInfo{filename.c_str(), fs, mtime, is_erasure_coded}; + return io::ScanRange::FileInfo{ + filename.c_str(), fs, mtime, is_encrypted, is_erasure_coded}; } /// Connection to the filesystem containing the file. @@ -97,6 +98,9 @@ struct HdfsFileDesc { /// Extra file metadata, e.g. Iceberg-related file-level info. const ::org::apache::impala::fb::FbFileMetadata* file_metadata; + /// Whether file is encrypted. + bool is_encrypted = false; + /// Whether file is erasure coded. bool is_erasure_coded = false; @@ -789,6 +793,7 @@ class HdfsScanNodeBase : public ScanNode { RuntimeProfile::Counter* bytes_read_local_ = nullptr; RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr; RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr; + RuntimeProfile::Counter* bytes_read_encrypted_ = nullptr; RuntimeProfile::Counter* bytes_read_ec_ = nullptr; RuntimeProfile::Counter* num_remote_ranges_ = nullptr; RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr; diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc index f3050ff97..a4193dca5 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.cc +++ b/be/src/exec/orc/hdfs-orc-scanner.cc @@ -156,7 +156,8 @@ Status HdfsOrcScanner::ScanRangeInputStream::readRandom( int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE; ScanRange* range = scanner_->scan_node_->AllocateScanRange( ScanRange::FileInfo{scanner_->filename(), metadata_range->fs(), - split_range->mtime(), split_range->is_erasure_coded()}, + split_range->mtime(), split_range->is_encrypted(), + split_range->is_erasure_coded()}, length, offset, partition_id, split_range->disk_id(), expected_local, BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options)); unique_ptr<BufferDescriptor> io_buffer; @@ -258,7 +259,7 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe) } ScanRange* scan_range = scan_node_->AllocateScanRange( ScanRange::FileInfo{filename(), metadata_range_->fs(), split_range->mtime(), - split_range->is_erasure_coded()}, + split_range->is_encrypted(), split_range->is_erasure_coded()}, range.length_, range.offset_, partition_id, split_range->disk_id(), col_range_local, BufferOpts(split_range->cache_options())); RETURN_IF_ERROR( diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc index 1d814dcc8..c1d8c70f5 100644 --- a/be/src/runtime/io/hdfs-file-reader.cc +++ b/be/src/runtime/io/hdfs-file-reader.cc @@ -238,6 +238,9 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_ bool is_first_read = (num_remote_bytes_ == 0); // Collect and accumulate statistics GetHdfsStatistics(hdfs_file, log_slow_read); + if (scan_range_->is_encrypted()) { + scan_range_->reader_->bytes_read_encrypted_.Add(current_bytes_read); + } if (scan_range_->is_erasure_coded()) { scan_range_->reader_->bytes_read_ec_.Add(current_bytes_read); } diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 585a28f47..8ecc2b7ee 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -158,6 +158,7 @@ class RequestContext { int64_t bytes_read_local() const { return bytes_read_local_.Load(); } int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); } int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); } + int64_t bytes_read_encrypted() const { return bytes_read_encrypted_.Load(); } int64_t bytes_read_ec() const { return bytes_read_ec_.Load(); } int num_remote_ranges() const { return num_remote_ranges_.Load(); } int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); } @@ -400,6 +401,9 @@ class RequestContext { /// Total number of bytes read from date node cache, updated at end of each range scan AtomicInt64 bytes_read_dn_cache_{0}; + /// Total number of encrypted bytes read + AtomicInt64 bytes_read_encrypted_{0}; + /// Total number of erasure-coded bytes read AtomicInt64 bytes_read_ec_{0}; diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index 01bbe2c01..2e5e9a38e 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -146,6 +146,7 @@ class RequestRange : public InternalQueue<RequestRange>::Node { int64_t offset() const { return offset_; } int64_t len() const { return len_; } int disk_id() const { return disk_id_; } + bool is_encrypted() const { return is_encrypted_; } bool is_erasure_coded() const { return is_erasure_coded_; } RequestType::type request_type() const { return request_type_; } @@ -172,6 +173,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node { /// Id of disk queue containing byte range. int disk_id_; + /// Whether file is encrypted. + bool is_encrypted_; + /// Whether file is erasure coded. bool is_erasure_coded_; @@ -272,6 +276,7 @@ class ScanRange : public RequestRange { const char *filename; hdfsFS fs = nullptr; int64_t mtime = ScanRange::INVALID_MTIME; + bool is_encrypted = false; bool is_erasure_coded = false; }; @@ -283,7 +288,7 @@ class ScanRange : public RequestRange { /// Get file info for the current scan range. FileInfo GetFileInfo() const { - return FileInfo{file_.c_str(), fs_, mtime_, is_erasure_coded_}; + return FileInfo{file_.c_str(), fs_, mtime_, is_encrypted_, is_erasure_coded_}; } /// Resets this scan range object with the scan range description. The scan range diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 693807358..5dd5eae8f 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -511,6 +511,7 @@ void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_ bytes_to_read_ = len; offset_ = offset; disk_id_ = disk_id; + is_encrypted_ = fi.is_encrypted; is_erasure_coded_ = fi.is_erasure_coded; cache_options_ = buffer_opts.cache_options_; disk_file_ = disk_file; diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 67ea5414c..947f234f2 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -134,6 +134,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec hdfs_scan_range.__set_offset(scan_range_offset); hdfs_scan_range.__set_partition_id(spec.partition_id); hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash); + hdfs_scan_range.__set_is_encrypted(fb_desc->is_encrypted()); hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec()); if (fb_desc->absolute_path() != nullptr) { hdfs_scan_range.__set_absolute_path(fb_desc->absolute_path()->str()); @@ -1124,6 +1125,7 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime); hdfs_file_split->set_partition_path_hash( tscan_range.hdfs_file_split.partition_path_hash); + hdfs_file_split->set_is_encrypted(tscan_range.hdfs_file_split.is_encrypted); hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded); if (tscan_range.hdfs_file_split.__isset.absolute_path) { hdfs_file_split->set_absolute_path( diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 6a2e07402..cb88adca5 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -59,6 +59,8 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ = "impala-server.io-mgr.short-circuit-bytes-read"; const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ = "impala-server.io-mgr.cached-bytes-read"; +const char* ImpaladMetricKeys::IO_MGR_ENCRYPTED_BYTES_READ = + "impala-server.io-mgr.encrypted-bytes-read"; const char* ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ = "impala-server.io-mgr.erasure-coded-bytes-read"; const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = @@ -169,6 +171,7 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr; +IntCounter* ImpaladMetrics::IO_MGR_ENCRYPTED_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr; IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = nullptr; @@ -344,6 +347,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0); IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter( ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0); + IO_MGR_ENCRYPTED_BYTES_READ = IO_MGR_METRICS->AddCounter( + ImpaladMetricKeys::IO_MGR_ENCRYPTED_BYTES_READ, 0); IO_MGR_ERASURE_CODED_BYTES_READ = IO_MGR_METRICS->AddCounter( ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ, 0); IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter( diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index c43dca69f..08131da30 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -76,6 +76,9 @@ class ImpaladMetricKeys { /// Total number of cached bytes read by the io mgr static const char* IO_MGR_CACHED_BYTES_READ; + /// Total number of encrypted bytes read by the io mgr + static const char* IO_MGR_ENCRYPTED_BYTES_READ; + /// Total number of erasure-coded bytes read by the io mgr static const char* IO_MGR_ERASURE_CODED_BYTES_READ; @@ -261,6 +264,7 @@ class ImpaladMetrics { static IntCounter* IO_MGR_BYTES_READ; static IntCounter* IO_MGR_LOCAL_BYTES_READ; static IntCounter* IO_MGR_CACHED_BYTES_READ; + static IntCounter* IO_MGR_ENCRYPTED_BYTES_READ; static IntCounter* IO_MGR_ERASURE_CODED_BYTES_READ; static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES; static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT; diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs index 8ecfb2f11..f0dc08824 100644 --- a/common/fbs/CatalogObjects.fbs +++ b/common/fbs/CatalogObjects.fbs @@ -83,6 +83,9 @@ table FbFileDesc { // The absolute path of the file, it's used only when data files are outside of // the Iceberg table location (IMPALA-11507). absolute_path: string (id: 6); + + // Whether this file is encrypted + is_encrypted: bool = false (id: 7); } // Additional file-related metadata diff --git a/common/protobuf/planner.proto b/common/protobuf/planner.proto index 4e7c8ac63..26b4cd2fc 100644 --- a/common/protobuf/planner.proto +++ b/common/protobuf/planner.proto @@ -58,6 +58,9 @@ message HdfsFileSplitPB { // The absolute path of the file, it's used only when data files are outside of // the Iceberg table location (IMPALA-11507). optional string absolute_path = 10; + + // Whether this file is encrypted. + optional bool is_encrypted = 11; } // Key range for single THBaseScanNode. Corresponds to THBaseKeyRange and should be kept diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 86519e4d1..6e8e370be 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -230,6 +230,9 @@ struct THdfsFileSplit { // The absolute path of the file, it's used only when data files are outside of // the Iceberg table location (IMPALA-11507). 10: optional string absolute_path + + // Whether the HDFS file is stored with transparent data encryption. + 11: optional bool is_encrypted } // Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index 5f1a68e71..403724480 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -489,6 +489,16 @@ "kind": "COUNTER", "key": "impala-server.io-mgr.cached-bytes-read" }, + { + "description": "Total number of encrypted bytes read by the IO manager.", + "contexts": [ + "IMPALAD" + ], + "label": "Impala Server Io Mgr Encrypted Bytes Read", + "units": "BYTES", + "kind": "COUNTER", + "key": "impala-server.io-mgr.encrypted-bytes-read" + }, { "description": "Total number of erasure-coded bytes read by the IO manager.", "contexts": [ diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index c9c489ab5..b3a66a0d8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -67,7 +67,6 @@ import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Pair; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.Reference; -import org.apache.impala.compat.HdfsShim; import org.apache.impala.fb.FbFileBlock; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TCompressionCodec; @@ -679,8 +678,8 @@ public interface FeIcebergTable extends FeFsTable { } return HdfsPartition.FileDescriptor.create(fileStatus, relPath, locations, - table.getHostIndex(), HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds, - absPath); + table.getHostIndex(), fileStatus.isEncrypted(), fileStatus.isErasureCoded(), + numUnknownDiskIds, absPath); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index 9ebc6fa1f..9a514b05d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Reference; -import org.apache.impala.compat.HdfsShim; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.HudiUtil; @@ -291,7 +290,8 @@ public class FileMetadataLoader { locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); } return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_, - HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds, absPath); + fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds, + absPath); } private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus, diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 02e285ba5..034790fd9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -191,8 +191,9 @@ public class HdfsPartition extends CatalogObjectImpl * for which no disk ID could be determined */ public static FileDescriptor create(FileStatus fileStatus, String relPath, - BlockLocation[] blockLocations, ListMap<TNetworkAddress> hostIndex, boolean isEc, - Reference<Long> numUnknownDiskIds, String absPath) throws IOException { + BlockLocation[] blockLocations, ListMap<TNetworkAddress> hostIndex, + boolean isEncrypted, boolean isEc, Reference<Long> numUnknownDiskIds, + String absPath) throws IOException { FlatBufferBuilder fbb = new FlatBufferBuilder(1); int[] fbFileBlockOffsets = new int[blockLocations.length]; int blockIdx = 0; @@ -207,7 +208,7 @@ public class HdfsPartition extends CatalogObjectImpl } } return new FileDescriptor(createFbFileDesc(fbb, fileStatus, relPath, - fbFileBlockOffsets, isEc, absPath)); + fbFileBlockOffsets, isEncrypted, isEc, absPath)); } /** @@ -218,7 +219,7 @@ public class HdfsPartition extends CatalogObjectImpl FileStatus fileStatus, String relPath, String absPath) { FlatBufferBuilder fbb = new FlatBufferBuilder(1); return new FileDescriptor( - createFbFileDesc(fbb, fileStatus, relPath, null, false, absPath)); + createFbFileDesc(fbb, fileStatus, relPath, null, false, false, absPath)); } /** * Serializes the metadata of a file descriptor represented by 'fileStatus' into a @@ -227,8 +228,8 @@ public class HdfsPartition extends CatalogObjectImpl * in the underlying buffer. Can be null if there are no blocks. */ private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb, - FileStatus fileStatus, String relPath, int[] fbFileBlockOffsets, boolean isEc, - String absPath) { + FileStatus fileStatus, String relPath, int[] fbFileBlockOffsets, + boolean isEncrypted, boolean isEc, String absPath) { int relPathOffset = fbb.createString(relPath == null ? StringUtils.EMPTY : relPath); // A negative block vector offset is used when no block offsets are specified. int blockVectorOffset = -1; @@ -242,6 +243,7 @@ public class HdfsPartition extends CatalogObjectImpl FbFileDesc.addRelativePath(fbb, relPathOffset); FbFileDesc.addLength(fbb, fileStatus.getLen()); FbFileDesc.addLastModificationTime(fbb, fileStatus.getModificationTime()); + FbFileDesc.addIsEncrypted(fbb, isEncrypted); FbFileDesc.addIsEc(fbb, isEc); HdfsCompression comp = HdfsCompression.fromFileName(fileStatus.getPath().getName()); FbFileDesc.addCompression(fbb, comp.toFb()); @@ -288,6 +290,7 @@ public class HdfsPartition extends CatalogObjectImpl public long getModificationTime() { return fbFileDescriptor_.lastModificationTime(); } public int getNumFileBlocks() { return fbFileDescriptor_.fileBlocksLength(); } + public boolean getIsEncrypted() {return fbFileDescriptor_.isEncrypted(); } public boolean getIsEc() {return fbFileDescriptor_.isEc(); } public FbFileBlock getFbFileBlock(int idx) { diff --git a/fe/src/main/java/org/apache/impala/compat/HdfsShim.java b/fe/src/main/java/org/apache/impala/compat/HdfsShim.java deleted file mode 100644 index 9453f80a8..000000000 --- a/fe/src/main/java/org/apache/impala/compat/HdfsShim.java +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.compat; - -import org.apache.hadoop.fs.FileStatus; - -/** - * Wrapper classes to abstract away differences between HDFS versions in - * the MiniCluster profiles. - */ -public class HdfsShim { - public static boolean isErasureCoded(FileStatus fileStatus) { - return fileStatus.isErasureCoded(); - } -} diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index d638ea1a1..3d8243f4d 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1427,6 +1427,7 @@ public class HdfsScanNode extends ScanNode { fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(), partition.getLocation().hashCode()); hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath()); + hdfsFileSplit.setIs_encrypted(fileDesc.getIsEncrypted()); hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc()); scanRange.setHdfs_file_split(hdfsFileSplit); if (fileDesc.getFbFileMetadata() != null) { diff --git a/tests/query_test/test_io_metrics.py b/tests/query_test/test_io_metrics.py index b45a0f8e7..5388f85b6 100644 --- a/tests/query_test/test_io_metrics.py +++ b/tests/query_test/test_io_metrics.py @@ -20,7 +20,7 @@ import pytest from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.util.filesystem_utils import IS_EC, IS_HDFS +from tests.util.filesystem_utils import IS_EC, IS_HDFS, IS_ENCRYPTED class TestIOMetrics(ImpalaTestSuite): @@ -47,6 +47,7 @@ class TestIOMetrics(ImpalaTestSuite): def append_metric(metric, expect_nonzero): (expect_nonzero_metrics if expect_nonzero else expect_zero_metrics).append(metric) + append_metric("impala-server.io-mgr.encrypted-bytes-read", IS_ENCRYPTED) append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC) append_metric("impala-server.io-mgr.short-circuit-bytes-read", IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER)
