This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 142ae261e319c3b602128deb1329713d5253637d Author: Venu Reddy <[email protected]> AuthorDate: Thu Dec 14 12:45:34 2023 +0530 IMPALA-9375: Remove DirectMetaProvider usage from CatalogMetaProvider This commit removes the CatalogMetaProvider dependence on DirectMetaProvider. CatalogMetaProvider depends on DirectMetaProvider for 2 APIs. Implemented the APIs on catalog server and used them instead. DirectMetaProvider is not referenced anywhere now. But it is retained for future use. Testing: - Manually tested and CatalogdMetaProviderTest covers the tests. Change-Id: I096c1b1d1a52e979c8b2d8173dae9ca2cc6c36d2 Reviewed-on: http://gerrit.cloudera.org:8080/20791 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 28 ++++++++ be/src/catalog/catalog-service-client-wrapper.h | 16 +++++ be/src/catalog/catalog.cc | 11 ++++ be/src/catalog/catalog.h | 9 +++ be/src/exec/catalog-op-executor.cc | 28 ++++++++ be/src/exec/catalog-op-executor.h | 8 +++ be/src/service/fe-support.cc | 48 ++++++++++++++ common/thrift/CatalogService.thrift | 35 ++++++++++ .../org/apache/impala/compat/MetastoreShim.java | 9 +++ .../org/apache/impala/compat/MetastoreShim.java | 77 ++++++++++++---------- .../impala/catalog/local/CatalogdMetaProvider.java | 67 ++++++++++++++++--- .../java/org/apache/impala/service/FeSupport.java | 42 ++++++++++++ .../java/org/apache/impala/service/JniCatalog.java | 32 +++++++++ .../catalog/local/CatalogdMetaProviderTest.java | 15 +++++ 14 files changed, 382 insertions(+), 43 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 43f77e970..cc1667731 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -363,6 +363,34 @@ class CatalogServiceThriftIf : public CatalogServiceIf { VLOG_RPC << "UpdateTableUsage(): response.status=" << resp.status; } + void GetNullPartitionName(TGetNullPartitionNameResponse& resp, + const TGetNullPartitionNameRequest& req) override { + VLOG_RPC << "GetNullPartitionName(): request=" << ThriftDebugString(req); + Status status = CheckProtocolVersion(req.protocol_version); + if (status.ok()) { + status = catalog_server_->catalog()->GetNullPartitionName(&resp); + } + if (!status.ok()) LOG(ERROR) << status.GetDetail(); + TStatus thrift_status; + status.ToThrift(&thrift_status); + resp.__set_status(thrift_status); + VLOG_RPC << "GetNullPartitionName(): response=" << ThriftDebugStringNoThrow(resp); + } + + void GetLatestCompactions(TGetLatestCompactionsResponse& resp, + const TGetLatestCompactionsRequest& req) override { + VLOG_RPC << "GetLatestCompactions(): request=" << ThriftDebugString(req); + Status status = CheckProtocolVersion(req.protocol_version); + if (status.ok()) { + status = catalog_server_->catalog()->GetLatestCompactions(req, &resp); + } + if (!status.ok()) LOG(ERROR) << status.GetDetail(); + TStatus thrift_status; + status.ToThrift(&thrift_status); + resp.__set_status(thrift_status); + VLOG_RPC << "GetLatestCompactions(): response=" << ThriftDebugStringNoThrow(resp); + } + private: CatalogServer* catalog_server_; diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h index 294960771..6775e4594 100644 --- a/be/src/catalog/catalog-service-client-wrapper.h +++ b/be/src/catalog/catalog-service-client-wrapper.h @@ -110,6 +110,22 @@ class CatalogServiceClientWrapper : public CatalogServiceClient { *send_done = true; recv_UpdateTableUsage(_return); } + + void GetNullPartitionName(TGetNullPartitionNameResponse& _return, + const TGetNullPartitionNameRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_GetNullPartitionName(req); + *send_done = true; + recv_GetNullPartitionName(_return); + } + + void GetLatestCompactions(TGetLatestCompactionsResponse& _return, + const TGetLatestCompactionsRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_GetLatestCompactions(req); + *send_done = true; + recv_GetLatestCompactions(_return); + } #pragma clang diagnostic pop }; diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc index 698091787..92b306b51 100644 --- a/be/src/catalog/catalog.cc +++ b/be/src/catalog/catalog.cc @@ -71,6 +71,8 @@ Catalog::Catalog() { {"updateTableUsage", "([B)V", &update_table_usage_id_}, {"regenerateServiceId", "()V", ®enerate_service_id_}, {"refreshDataSources", "()V", &refresh_data_sources_}, + {"getNullPartitionName", "()[B", &get_null_partition_name_id_}, + {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_}, }; JNIEnv* jni_env = JniUtil::GetJNIEnv(); @@ -220,3 +222,12 @@ void Catalog::RegenerateServiceId() { Status Catalog::RefreshDataSources() { return JniUtil::CallJniMethod(catalog_, refresh_data_sources_); } + +Status Catalog::GetNullPartitionName(TGetNullPartitionNameResponse* resp) { + return JniUtil::CallJniMethod(catalog_, get_null_partition_name_id_, resp); +} + +Status Catalog::GetLatestCompactions( + const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp) { + return JniUtil::CallJniMethod(catalog_, get_latest_compactions_id_, req, resp); +} diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h index c94f09a6b..f102a2908 100644 --- a/be/src/catalog/catalog.h +++ b/be/src/catalog/catalog.h @@ -137,6 +137,13 @@ class Catalog { /// report. Status UpdateTableUsage(const TUpdateTableUsageRequest& req); + /// Gets the null partition name. + Status GetNullPartitionName(TGetNullPartitionNameResponse* resp); + + /// Gets the latest compactions for the request. + Status GetLatestCompactions( + const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp); + /// Regenerate Catalog Service ID. /// The function should be called when the CatalogD becomes active. void RegenerateServiceId(); @@ -170,6 +177,8 @@ class Catalog { jmethodID update_table_usage_id_; jmethodID regenerate_service_id_; // JniCatalog.regenerateServiceId() jmethodID refresh_data_sources_; // JniCatalog.refreshDataSources() + jmethodID get_null_partition_name_id_; // JniCatalog.getNullPartitionName() + jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions() }; } diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index a7fb2381b..074d828e3 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -422,3 +422,31 @@ Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req, RETURN_IF_ERROR(rpc_status.status); return Status::OK(); } + +Status CatalogOpExecutor::GetNullPartitionName( + const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result) { + int attempt = 0; // Used for debug action only. + CatalogServiceConnection::RpcStatus rpc_status = + CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), + *ExecEnv::GetInstance()->GetCatalogdAddress().get(), + &CatalogServiceClientWrapper::GetNullPartitionName, req, + FLAGS_catalog_client_connection_num_retries, + FLAGS_catalog_client_rpc_retry_interval_ms, + [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result); + RETURN_IF_ERROR(rpc_status.status); + return Status::OK(); +} + +Status CatalogOpExecutor::GetLatestCompactions( + const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result) { + int attempt = 0; // Used for debug action only. + CatalogServiceConnection::RpcStatus rpc_status = + CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), + *ExecEnv::GetInstance()->GetCatalogdAddress().get(), + &CatalogServiceClientWrapper::GetLatestCompactions, req, + FLAGS_catalog_client_connection_num_retries, + FLAGS_catalog_client_rpc_retry_interval_ms, + [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result); + RETURN_IF_ERROR(rpc_status.status); + return Status::OK(); +} diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h index cab823bc6..12bf2e0f1 100644 --- a/be/src/exec/catalog-op-executor.h +++ b/be/src/exec/catalog-op-executor.h @@ -80,6 +80,14 @@ class CatalogOpExecutor { Status UpdateTableUsage(const TUpdateTableUsageRequest& req, TUpdateTableUsageResponse* resp); + /// Makes an RPC to the catalog server to get the null partition name. + Status GetNullPartitionName( + const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse* result); + + /// Makes an RPC to the catalog server to get the latest compactions. + Status GetLatestCompactions( + const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result); + /// Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution. /// If called before Exec(), this will return NULL. Only set if the /// TCatalogOpType is DDL. diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index a31979ce3..f4bee52e6 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -695,6 +695,46 @@ Java_org_apache_impala_service_FeSupport_nativeParseDateString(JNIEnv* env, return result_bytes; } +// Native method to make a request to catalog server to get the null partition name. +extern "C" JNIEXPORT jbyteArray JNICALL +Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName( + JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) { + TGetNullPartitionNameRequest request; + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env, + JniUtil::internal_exc_class(), nullptr); + CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr); + TGetNullPartitionNameResponse result; + Status status = catalog_op_executor.GetNullPartitionName(request, &result); + if (!status.ok()) { + LOG(ERROR) << status.GetDetail(); + status.ToThrift(&result.status); + } + jbyteArray result_bytes = nullptr; + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; +} + +// Native method to make a request to catalog server to get the latest compactions. +extern "C" JNIEXPORT jbyteArray JNICALL +Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions( + JNIEnv* env, jclass fe_support_class, jbyteArray thrift_struct) { + TGetLatestCompactionsRequest request; + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env, + JniUtil::internal_exc_class(), nullptr); + CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr); + TGetLatestCompactionsResponse result; + Status status = catalog_op_executor.GetLatestCompactions(request, &result); + if (!status.ok()) { + LOG(ERROR) << status.GetDetail(); + status.ToThrift(&result.status); + } + jbyteArray result_bytes = nullptr; + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; +} + namespace impala { static JNINativeMethod native_methods[] = { @@ -770,6 +810,14 @@ static JNINativeMethod native_methods[] = { const_cast<char*>("(Ljava/lang/String;)[B"), (void*)::Java_org_apache_impala_service_FeSupport_nativeParseDateString }, + { + const_cast<char*>("NativeGetNullPartitionName"), const_cast<char*>("([B)[B"), + (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetNullPartitionName + }, + { + const_cast<char*>("NativeGetLatestCompactions"), const_cast<char*>("([B)[B"), + (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions + }, }; void InitFeSupport(bool disable_codegen) { diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 99d210d06..701f10641 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -666,6 +666,35 @@ struct TGetPartitionStatsResponse { 2: optional map<string, binary> partition_stats } +// Request null partition name. +struct TGetNullPartitionNameRequest { + 1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2 +} + +// Response for null partition name request. +struct TGetNullPartitionNameResponse { + 1: required Status.TStatus status + // Null partition name. + 2: required string partition_value +} + +// Request latest compactions. +struct TGetLatestCompactionsRequest { + 1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2 + 2: required string db_name + 3: required string table_name + 4: required string non_parition_name + 5: optional list<string> partition_names + 6: required i64 last_compaction_id +} + +// Response for latest compactions request. +struct TGetLatestCompactionsResponse { + 1: required Status.TStatus status + // Map of partition name to the compaction id + 2: required map<string, i64> partition_to_compaction_id +} + // Instructs the Catalog Server to prioritizing loading of metadata for the specified // catalog objects. Currently only used for controlling the priority of loading // tables/views since Db/Function metadata is loaded on startup. @@ -736,4 +765,10 @@ service CatalogService { // Update recently used tables and their usage counts in an impalad since the last // report. TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req); + + // Gets the null partition name used at HMS. + TGetNullPartitionNameResponse GetNullPartitionName(1: TGetNullPartitionNameRequest req); + + // Gets the latest compactions. + TGetLatestCompactionsResponse GetLatestCompactions(1: TGetLatestCompactionsRequest req); } diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 038f6eb1e..21f0a7266 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -589,6 +589,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase { "getPartitionsForRefreshingFileMetadata is not supported."); } + /** + * CDP Hive-3 only function. + */ + public static Map<String, Long> getLatestCompactions(MetaStoreClient client, + String dbName, String tableName, List<String> partitionNames, + String unPartitionedName, long lastCompactionId) throws TException { + throw new UnsupportedOperationException("getLatestCompactions is not supported."); + } + /** * CDP Hive-3 only function. */ diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index d3f33e8cb..a1ea42fe8 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -743,39 +743,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } /** - * Fetches the latest compaction id from HMS and compares with partition metadata in - * cache. If a partition is stale due to compaction, removes it from metas. + * Fetches the latest compactions from HMS */ - public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool, - String dbName, String tableName, TableMetaRef table, - Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName) - throws TException { - Preconditions.checkNotNull(table, "TableMetaRef must be non-null"); - Preconditions.checkNotNull(metas, "Partition map must be non-null"); - if (metas.isEmpty()) { - return Collections.emptyList(); - } - Stopwatch sw = Stopwatch.createStarted(); - List<PartitionRef> stalePartitions = new ArrayList<>(); - if (!table.isTransactional() || metas.isEmpty()) return stalePartitions; + public static Map<String, Long> getLatestCompactions(MetaStoreClient client, + String dbName, String tableName, List<String> partitionNames, + String unPartitionedName, long lastCompactionId) throws TException { GetLatestCommittedCompactionInfoRequest request = new GetLatestCommittedCompactionInfoRequest(dbName, tableName); - if (table.isPartitioned()) { - request.setPartitionnames(metas.keySet().stream() - .map(PartitionRef::getName).collect(Collectors.toList())); - } - long lastCompactionId = metas.values().stream() - .mapToLong(p -> p.getLastCompactionId()).max().orElse(-1); - if (lastCompactionId > 0) { - request.setLastCompactionId(lastCompactionId); - } - + request.setPartitionnames(partitionNames); + if (lastCompactionId > 0) request.setLastCompactionId(lastCompactionId); GetLatestCommittedCompactionInfoResponse response; - try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) { - response = CompactionInfoLoader.getLatestCompactionInfo(client, request); - } + response = CompactionInfoLoader.getLatestCompactionInfo(client, request); Map<String, Long> partNameToCompactionId = new HashMap<>(); - if (table.isPartitioned()) { + if (partitionNames != null) { for (CompactionInfoStruct ci : response.getCompactions()) { if (ci.getPartitionname() != null) { partNameToCompactionId.put(ci.getPartitionname(), ci.getId()); @@ -786,14 +766,41 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } } } else { - CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), - null); - if (ci != null) { - partNameToCompactionId.put(unPartitionedName, ci.getId()); - } + CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), null); + if (ci != null) partNameToCompactionId.put(unPartitionedName, ci.getId()); + } + return partNameToCompactionId; + } + + /** + * Fetches the latest compaction id from HMS and compares with partition metadata in + * cache. If a partition is stale due to compaction, removes it from metas. + */ + public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool, + String dbName, String tableName, TableMetaRef table, + Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName) + throws TException { + Preconditions.checkNotNull(table, "TableMetaRef must be non-null"); + Preconditions.checkNotNull(metas, "Partition map must be non-null"); + if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList(); + Stopwatch sw = Stopwatch.createStarted(); + List<String> partitionNames = null; + if (table.isPartitioned()) { + partitionNames = + metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList()); } - Iterator<Entry<PartitionRef, PartitionMetadata>> iter = - metas.entrySet().iterator(); + long lastCompactionId = metas.values() + .stream() + .mapToLong(PartitionMetadata::getLastCompactionId) + .max() + .orElse(-1); + Map<String, Long> partNameToCompactionId = Collections.emptyMap(); + try (MetaStoreClientPool.MetaStoreClient client = msClientPool.getClient()) { + partNameToCompactionId = getLatestCompactions( + client, dbName, tableName, partitionNames, unPartitionedName, lastCompactionId); + } + List<PartitionRef> stalePartitions = new ArrayList<>(); + Iterator<Entry<PartitionRef, PartitionMetadata>> iter = metas.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next(); if (partNameToCompactionId.containsKey(entry.getKey().getName())) { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index cbfbfbf96..2a7b611b1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -20,8 +20,10 @@ package org.apache.impala.catalog.local; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,6 +65,7 @@ import org.apache.impala.catalog.HdfsCachePool; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.HdfsPartitionLocationCompressor; import org.apache.impala.catalog.HdfsStorageDescriptor; +import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer; import org.apache.impala.catalog.Principal; import org.apache.impala.catalog.PrincipalPrivilege; @@ -71,6 +74,7 @@ import org.apache.impala.catalog.VirtualColumn; import org.apache.impala.catalog.local.LocalIcebergTable.TableParams; import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; +import org.apache.impala.common.PrintUtils; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.FeSupport; import org.apache.impala.service.FrontendProfile; @@ -88,6 +92,8 @@ import org.apache.impala.thrift.TDbInfoSelector; import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TFunction; import org.apache.impala.thrift.TFunctionName; +import org.apache.impala.thrift.TGetLatestCompactionsRequest; +import org.apache.impala.thrift.TGetLatestCompactionsResponse; import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; import org.apache.impala.thrift.THdfsFileDesc; @@ -117,6 +123,7 @@ import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -261,11 +268,6 @@ public class CatalogdMetaProvider implements MetaProvider { private final ListMap<TNetworkAddress> cacheHostIndex_ = new ListMap<TNetworkAddress>(); - // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces - // of metadata. In order to incrementally build this out, we delegate various calls - // to the "direct" provider for now and circumvent catalogd. - private DirectMetaProvider directProvider_ = new DirectMetaProvider(); - /** * Number of requests which piggy-backed on a concurrent request for the same key, * and resulted in success. Used only for test assertions. @@ -948,8 +950,8 @@ public class CatalogdMetaProvider implements MetaProvider { hostIndex, partitionRefs); if (BackendConfig.INSTANCE.isAutoCheckCompaction()) { // If any partitions are stale after compaction, they will be removed from refToMeta - List<PartitionRef> stalePartitions = directProvider_.checkLatestCompaction( - refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta); + List<PartitionRef> stalePartitions = + checkLatestCompaction(refImpl.dbName_, refImpl.tableName_, refImpl, refToMeta); cache_.invalidateAll(stalePartitions.stream() .map(PartitionRefImpl.class::cast) .map(PartitionRefImpl::getId) @@ -988,6 +990,55 @@ public class CatalogdMetaProvider implements MetaProvider { return nameToMeta; } + /** + * Fetches the latest compactions from catalogd and compares with partition metadata in + * cache. If a partition is stale due to compaction, removes it from metas. And return + * the stale partitions. + */ + private List<PartitionRef> checkLatestCompaction(String dbName, String tableName, + TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException { + Preconditions.checkNotNull(table, "TableMetaRef must be non-null"); + Preconditions.checkNotNull(metas, "Partition map must be non-null"); + if (!table.isTransactional() || metas.isEmpty()) return Collections.emptyList(); + Stopwatch sw = Stopwatch.createStarted(); + TGetLatestCompactionsRequest req = new TGetLatestCompactionsRequest(); + req.db_name = dbName; + req.table_name = tableName; + req.non_parition_name = HdfsTable.DEFAULT_PARTITION_NAME; + if (table.isPartitioned()) { + req.partition_names = + metas.keySet().stream().map(PartitionRef::getName).collect(Collectors.toList()); + } + req.last_compaction_id = metas.values() + .stream() + .mapToLong(PartitionMetadata::getLastCompactionId) + .max() + .orElse(-1); + byte[] ret = FeSupport.GetLatestCompactions( + new TSerializer(new TBinaryProtocol.Factory()).serialize(req)); + TGetLatestCompactionsResponse resp = new TGetLatestCompactionsResponse(); + new TDeserializer(new TBinaryProtocol.Factory()).deserialize(resp, ret); + if (resp.status.status_code != TErrorCode.OK) { + throw new TException(Joiner.on("\n").join(resp.status.getError_msgs())); + } + Map<String, Long> partNameToCompactionId = resp.partition_to_compaction_id; + Preconditions.checkNotNull( + partNameToCompactionId, "Partition name to compaction id map must be non-null"); + List<PartitionRef> stalePartitions = new ArrayList<>(); + Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter = + metas.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next(); + if (partNameToCompactionId.containsKey(entry.getKey().getName())) { + stalePartitions.add(entry.getKey()); + iter.remove(); + } + } + LOG.info("Checked the latest compaction info for {}.{}. Time taken: {}", dbName, + tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS))); + return stalePartitions; + } + /** * Load the specified partitions 'prefs' from catalogd. The partitions are made * relative to the given 'hostIndex' before being returned. @@ -1189,7 +1240,7 @@ public class CatalogdMetaProvider implements MetaProvider { /** Called to load cache for cache misses */ @Override public String call() throws Exception { - return directProvider_.loadNullPartitionKeyValue(); + return FeSupport.GetNullPartitionName(); } }); } diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index e8ea8430a..4b56ca15c 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -36,6 +36,8 @@ import org.apache.impala.thrift.TCatalogServiceRequestHeader; import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TExprBatch; +import org.apache.impala.thrift.TGetNullPartitionNameRequest; +import org.apache.impala.thrift.TGetNullPartitionNameResponse; import org.apache.impala.thrift.TGetPartitionStatsRequest; import org.apache.impala.thrift.TGetPartitionStatsResponse; import org.apache.impala.thrift.TParseDateStringResult; @@ -140,6 +142,12 @@ public class FeSupport { // E.g.: '2011-01-01', '2011-01-1', '2011-1-01', '2011-01-01'. public native static byte[] nativeParseDateString(String date); + // Does an RPC to the Catalog Server to get the null partition name. + public native static byte[] NativeGetNullPartitionName(byte[] thriftReq); + + // Does an RPC to the Catalog Server to get the latest compactions. + public native static byte[] NativeGetLatestCompactions(byte[] thriftReq); + /** * Locally caches the jar at the specified HDFS location. * @@ -490,6 +498,40 @@ public class FeSupport { } } + private static byte[] GetNullPartitionName(byte[] thriftReq) { + try { + return NativeGetNullPartitionName(thriftReq); + } catch (UnsatisfiedLinkError e) { loadLibrary(); } + return NativeGetNullPartitionName(thriftReq); + } + + public static String GetNullPartitionName() throws InternalException { + TGetNullPartitionNameRequest request = new TGetNullPartitionNameRequest(); + TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse(); + try { + byte[] result = GetNullPartitionName( + new TSerializer(new TBinaryProtocol.Factory()).serialize(request)); + Preconditions.checkNotNull(result); + new TDeserializer(new TBinaryProtocol.Factory()).deserialize(response, result); + if (response.getStatus().getStatus_code() != TErrorCode.OK) { + throw new InternalException("Error requesting GetNullPartitionName: " + + Joiner.on("\n").join(response.getStatus().getError_msgs())); + } + Preconditions.checkNotNull(response.partition_value); + return response.partition_value; + } catch (TException e) { + // this should never happen + throw new InternalException("Error processing request: " + e.getMessage(), e); + } + } + + public static byte[] GetLatestCompactions(byte[] thriftReq) { + try { + return NativeGetLatestCompactions(thriftReq); + } catch (UnsatisfiedLinkError e) { loadLibrary(); } + return NativeGetLatestCompactions(thriftReq); + } + /** * Calling this function before loadLibrary() causes external frontend * initialization to be used during NativeFeInit() diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index 1adfd7efa..c25faabaa 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -68,6 +68,9 @@ import org.apache.impala.thrift.TGetDbsParams; import org.apache.impala.thrift.TGetDbsResult; import org.apache.impala.thrift.TGetFunctionsRequest; import org.apache.impala.thrift.TGetFunctionsResponse; +import org.apache.impala.thrift.TGetLatestCompactionsRequest; +import org.apache.impala.thrift.TGetLatestCompactionsResponse; +import org.apache.impala.thrift.TGetNullPartitionNameResponse; import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; import org.apache.impala.thrift.TGetPartitionStatsRequest; import org.apache.impala.thrift.TGetPartitionStatsResponse; @@ -85,6 +88,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest; import org.apache.impala.util.AuthorizationUtil; import org.apache.impala.util.CatalogOpUtil; import org.apache.impala.util.GlogAppender; +import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.PatternMatcher; import org.apache.impala.util.ThreadNameAnnotator; import org.apache.impala.util.TUniqueIdUtil; @@ -562,4 +566,32 @@ public class JniCatalog { public void refreshDataSources() throws TException { catalog_.refreshDataSources(); } + + public byte[] getNullPartitionName() throws ImpalaException, TException { + return execAndSerialize("getNullPartitionName", "Getting null partition name", () -> { + TGetNullPartitionNameResponse response = new TGetNullPartitionNameResponse(); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + response.setPartition_value( + MetaStoreUtil.getNullPartitionKeyValue(msClient.getHiveClient())); + response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList())); + } + return response; + }); + } + + public byte[] getLatestCompactions(byte[] thriftParams) + throws ImpalaException, TException { + TGetLatestCompactionsRequest request = new TGetLatestCompactionsRequest(); + JniUtil.deserializeThrift(protocolFactory_, request, thriftParams); + return execAndSerialize("getLatestCompactions", "Getting latest compactions", () -> { + TGetLatestCompactionsResponse response = new TGetLatestCompactionsResponse(); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + response.setPartition_to_compaction_id(MetastoreShim.getLatestCompactions( + msClient, request.db_name, request.table_name, request.partition_names, + request.non_parition_name, request.last_compaction_id)); + response.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList())); + } + return response; + }); + } } diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java index d0cc54deb..57daa7765 100644 --- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java @@ -611,6 +611,21 @@ public class CatalogdMetaProviderTest { } } + @Test + public void testLoadNullPartitionKeyValue() throws Exception { + provider_.cache_.invalidateAll(); + CacheStats stats = diffStats(); + String nullPartitionName = provider_.loadNullPartitionKeyValue(); + assertNotNull(nullPartitionName); + stats = diffStats(); + assertEquals(1, stats.missCount()); + assertEquals(0, stats.hitCount()); + assertEquals(nullPartitionName, provider_.loadNullPartitionKeyValue()); + stats = diffStats(); + assertEquals(0, stats.missCount()); + assertEquals(1, stats.hitCount()); + } + private void testFileMetadataAfterCompaction(String dbName, String tableName, String partition, boolean isMajorCompaction) throws Exception { String tableOrPartition = dbName + "." + tableName + " " + partition;
