This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 111497ddff5ed981e744dad3966c7d70ed7f837c
Author: Peter Rozsa <[email protected]>
AuthorDate: Thu Oct 6 12:18:57 2022 +0200

    IMPALA-11478: Cleanup JniCatalog
    
    This change adds a new class called JniCatalogOp which extracts
    the time calculation, error handling, thread annotation and logging
    from the operations in JniCatalog.
    
    Frontend test added as JniCatalogOpTest.
    Manually checked thread dump for proper thread naming, OOM exception,
    runtime exception and too slow and too large request warning.
    
    Change-Id: I1932172e2d13a7aab2336661c18befb4407ec9ab
    Reviewed-on: http://gerrit.cloudera.org:8080/19198
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/common/JniUtil.java     |  74 +++-
 .../java/org/apache/impala/service/JniCatalog.java | 464 +++++++++------------
 .../org/apache/impala/service/JniCatalogOp.java    |  94 +++++
 .../apache/impala/service/JniCatalogOpTest.java    |  67 +++
 4 files changed, 416 insertions(+), 283 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/JniUtil.java 
b/fe/src/main/java/org/apache/impala/common/JniUtil.java
index c87604ecc..43b09bc99 100644
--- a/fe/src/main/java/org/apache/impala/common/JniUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/JniUtil.java
@@ -143,32 +143,62 @@ public class JniUtil {
     }
   }
 
-  /**
-   * Warn if the result size or the response time exceeds thresholds.
-   */
-  public static void logResponse(long resultSize, long startTime, TBase<?, ?> 
thriftReq,
-      String methodName) {
-    long duration = System.currentTimeMillis() - startTime;
-    boolean tooLarge = (resultSize > 
BackendConfig.INSTANCE.getWarnCatalogResponseSize());
-    boolean tooSlow =
-        (duration > BackendConfig.INSTANCE.getWarnCatalogResponseDurationMs());
-    if (tooLarge || tooSlow) {
-      String header = (tooLarge && tooSlow) ? "Response too large and too 
slow" :
-          (tooLarge ? "Response too large" : "Response too slow");
-      String request = (thriftReq == null) ? "" :
-          ", request: " + StringUtils.abbreviate(thriftReq.toString(), 1000);
-      LOG.warn("{}: size={} ({}), duration={}ms ({}), method: {}{}",
-          header, resultSize, PrintUtils.printBytes(resultSize),
-          duration, PrintUtils.printTimeMs(duration), methodName, request);
+  public static class OperationLog {
+    private final long startTime;
+    private final String methodName;
+    private final String shortDescription;
+
+    public OperationLog(String methodName, String shortDescription) {
+      this.startTime = System.currentTimeMillis();
+      this.methodName = methodName;
+      this.shortDescription = shortDescription;
+    }
+
+    public void logStart() { LOG.info("{} request: {}", methodName, 
shortDescription); }
+
+    public void logFinish() {
+      long duration = getDurationFromStart();
+      LOG.info("Finished {} request: {}. Time spent: {}", methodName, 
shortDescription,
+          PrintUtils.printTimeMs(duration));
     }
-  }
 
-  public static void logResponse(long startTime, TBase<?, ?> thriftReq, String 
method) {
-    logResponse(0, startTime, thriftReq, method);
+    public void logError() {
+      long duration = getDurationFromStart();
+      LOG.error("Error in {}. Time spent: {}", shortDescription,
+          PrintUtils.printTimeMs(duration));
+    }
+
+    /**
+     * Warn if the result size or the response time exceeds thresholds.
+     */
+    public void logResponse(long resultSize, TBase<?, ?> thriftReq) {
+      long duration = getDurationFromStart();
+      boolean tooLarge =
+          (resultSize > BackendConfig.INSTANCE.getWarnCatalogResponseSize());
+      boolean tooSlow =
+          (duration > 
BackendConfig.INSTANCE.getWarnCatalogResponseDurationMs());
+      if (tooLarge || tooSlow) {
+        String header = (tooLarge && tooSlow) ?
+            "Response too large and too slow" :
+            (tooLarge ? "Response too large" : "Response too slow");
+        String request = (thriftReq == null) ?
+            "" :
+            ", request: " + StringUtils.abbreviate(thriftReq.toString(), 1000);
+        LOG.warn("{}: size={} ({}), duration={}ms ({}), method: {}{}", header, 
resultSize,
+            PrintUtils.printBytes(resultSize), duration, 
PrintUtils.printTimeMs(duration),
+            methodName, request);
+      }
+    }
+
+    private long getDurationFromStart() {
+      return System.currentTimeMillis() - this.startTime;
+    }
   }
 
-  public static void logResponse(long startTime, String method) {
-    logResponse(startTime, null, method);
+  public static OperationLog logOperation(String methodName, String 
shortDescription) {
+    OperationLog operationLog = new OperationLog(methodName, shortDescription);
+    operationLog.logStart();
+    return operationLog;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 6ff256e20..c8e74d63e 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -21,12 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
+import java.util.stream.Collectors;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.AuthorizationManager;
@@ -47,19 +47,20 @@ import 
org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
-import org.apache.impala.common.PrintUtils;
-import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hive.executor.HiveJavaFunctionFactoryImpl;
+import org.apache.impala.service.JniCatalogOp.JniCatalogOpCallable;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TErrorCode;
-import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogDeltaRequest;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
@@ -68,22 +69,23 @@ import org.apache.impala.thrift.TGetFunctionsResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
-import org.apache.impala.thrift.TGetTablesParams;
 import org.apache.impala.thrift.TGetTableMetricsParams;
+import org.apache.impala.thrift.TGetTablesParams;
 import org.apache.impala.thrift.TGetTablesResult;
 import org.apache.impala.thrift.TLogLevel;
 import org.apache.impala.thrift.TPrioritizeLoadRequest;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TStatus;
+import org.apache.impala.thrift.TTableUsage;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
-import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AuthorizationUtil;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -92,12 +94,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * JNI-callable interface for the CatalogService. The main point is to 
serialize
- * and de-serialize thrift structures between C and Java parts of the 
CatalogService.
+ * JNI-callable interface for the CatalogService. The main point is to 
serialize and
+ * de-serialize thrift structures between C and Java parts of the 
CatalogService.
  */
 public class JniCatalog {
-  private final static Logger LOG = LoggerFactory.getLogger(JniCatalog.class);
-  private final static TBinaryProtocol.Factory protocolFactory_ =
+  private static final Logger LOG = LoggerFactory.getLogger(JniCatalog.class);
+  private static final TBinaryProtocol.Factory protocolFactory_ =
       new TBinaryProtocol.Factory();
   private final CatalogServiceCatalog catalog_;
   private final CatalogOpExecutor catalogOpExecutor_;
@@ -108,7 +110,7 @@ public class JniCatalog {
   private static final TUniqueId catalogServiceId_ = generateId();
 
   // A singleton monitoring class that keeps track of the catalog usage 
metrics.
-  private final CatalogOperationMetrics catalogOperationUsage =
+  private final CatalogOperationMetrics catalogOperationUsage_ =
       CatalogMonitor.INSTANCE.getCatalogOperationMetrics();
 
   private static TUniqueId generateId() {
@@ -116,8 +118,7 @@ public class JniCatalog {
     return new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
   }
 
-  public JniCatalog(byte[] thriftBackendConfig) throws InternalException,
-      ImpalaException, TException {
+  public JniCatalog(byte[] thriftBackendConfig) throws ImpalaException {
     TBackendGflags cfg = new TBackendGflags();
     JniUtil.deserializeThrift(protocolFactory_, cfg, thriftBackendConfig);
 
@@ -134,8 +135,8 @@ public class JniCatalog {
 
     // create the appropriate auth factory from backend config
     // this logic is shared with JniFrontend
-    final AuthorizationFactory authzFactory
-        = AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
+    final AuthorizationFactory authzFactory =
+        AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
 
     LOG.info(JniUtil.getJavaVersion());
 
@@ -145,12 +146,12 @@ public class JniCatalog {
       MetastoreShim.setHiveClientCapabilities();
     }
 
-    MetaStoreClientPool metaStoreClientPool = new MetaStoreClientPool(
-        CatalogServiceCatalog.INITIAL_META_STORE_CLIENT_POOL_SIZE,
-        cfg.initial_hms_cnxn_timeout_s);
+    MetaStoreClientPool metaStoreClientPool =
+        new 
MetaStoreClientPool(CatalogServiceCatalog.INITIAL_META_STORE_CLIENT_POOL_SIZE,
+            cfg.initial_hms_cnxn_timeout_s);
     catalog_ = new CatalogServiceCatalog(cfg.load_catalog_in_background,
-        cfg.num_metadata_loading_threads, getServiceId(),
-        cfg.local_library_path, metaStoreClientPool);
+        cfg.num_metadata_loading_threads, getServiceId(), 
cfg.local_library_path,
+        metaStoreClientPool);
     authzManager_ = authzFactory.newAuthorizationManager(catalog_);
     catalog_.setAuthzManager(authzManager_);
     catalogOpExecutor_ = new CatalogOpExecutor(catalog_, authzConfig, 
authzManager_,
@@ -158,8 +159,8 @@ public class JniCatalog {
     MetastoreEventFactory eventFactory =
         new EventFactoryForSyncToLatestEvent(catalogOpExecutor_);
     catalog_.setEventFactoryForSyncToLatestEvent(eventFactory);
-    ExternalEventsProcessor eventsProcessor = 
getEventsProcessor(metaStoreClientPool,
-        catalogOpExecutor_);
+    ExternalEventsProcessor eventsProcessor =
+        getEventsProcessor(metaStoreClientPool, catalogOpExecutor_);
     catalog_.setMetastoreEventProcessor(eventsProcessor);
     catalog_.startEventsProcessor();
     catalogMetastoreServer_ = getCatalogMetastoreServer(catalogOpExecutor_);
@@ -189,7 +190,7 @@ public class JniCatalog {
   /**
    * Returns a Metastore event processor object if
    * <code>BackendConfig#getHMSPollingIntervalInSeconds</code> returns a 
non-zero
-   *.value of polling interval. Otherwise, returns a no-op events processor. 
It is
+   * value of polling interval. Otherwise, returns a no-op events processor. 
It is
    * important to fetch the current notification event id at the Catalog 
service
    * initialization time so that event processor starts to sync at the event id
    * corresponding to the catalog start time.
@@ -199,9 +200,8 @@ public class JniCatalog {
       throws ImpalaException {
     long eventPollingInterval = 
BackendConfig.INSTANCE.getHMSPollingIntervalInSeconds();
     if (eventPollingInterval <= 0) {
-      LOG.info(String
-          .format("Metastore event processing is disabled. Event polling 
interval is %d",
-              eventPollingInterval));
+      LOG.info("Metastore event processing is disabled. Event polling interval 
is {}",
+          eventPollingInterval);
       return NoOpEventProcessor.getInstance();
     }
     try (MetaStoreClient metaStoreClient = metaStoreClientPool.getClient()) {
@@ -216,29 +216,31 @@ public class JniCatalog {
     }
   }
 
-  public static TUniqueId getServiceId() { return catalogServiceId_; }
+  private <RESULT, PARAMETER extends TBase<?, ?>> RESULT execOp(String 
methodName,
+      String shortDescription, JniCatalogOpCallable<Pair<RESULT, Long>> 
operand,
+      PARAMETER requestParameter) throws ImpalaException, TException {
+    return JniCatalogOp.execOp(methodName, shortDescription, operand, 
requestParameter);
+  }
 
-  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq) throws
-      ImpalaException, TException {
-    long start = System.currentTimeMillis();
-    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
-    JniUtil.deserializeThrift(protocolFactory_, params, 
thriftGetCatalogDeltaReq);
+  private byte[] execAndSerialize(String methodName, String shortDescription,
+      JniCatalogOpCallable<TBase<?, ?>> operand, Runnable finalClause)
+      throws ImpalaException, TException {
     TSerializer serializer = new TSerializer(protocolFactory_);
-    String shortDesc = "getting catalog delta from version " + 
params.getFrom_version();
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
-      byte[] res = serializer.serialize(new TGetCatalogDeltaResponse(
-          catalog_.getCatalogDelta(params.getNative_catalog_server_ptr(),
-              params.getFrom_version())));
-      JniUtil.logResponse(res.length, start, params, "getCatalogDelta");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in {}. Time spent: {}.",
-          shortDesc, PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+    return JniCatalogOp.execAndSerialize(
+        methodName, shortDescription, operand, serializer, finalClause);
+  }
+
+  private byte[] execAndSerialize(String methodName, String shortDescription,
+      JniCatalogOpCallable<TBase<?, ?>> operand) throws ImpalaException, 
TException {
+    return execAndSerialize(methodName, shortDescription, operand, () -> {});
   }
 
+  private String fullyQualifiedTableName(String databaseName, String 
tableName) {
+    return databaseName + "." + tableName;
+  }
+
+  public static TUniqueId getServiceId() { return catalogServiceId_; }
+
   /**
    * Gets the current catalog version.
    */
@@ -246,29 +248,29 @@ public class JniCatalog {
     return catalog_.getCatalogVersion();
   }
 
+  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
+      throws ImpalaException, TException {
+    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
+    JniUtil.deserializeThrift(protocolFactory_, params, 
thriftGetCatalogDeltaReq);
+    String shortDesc = "Getting catalog delta from version " + 
params.getFrom_version();
+
+    return execAndSerialize("getCatalogDelta", shortDesc, () -> {
+      long catalogDelta = catalog_.getCatalogDelta(
+          params.getNative_catalog_server_ptr(), params.getFrom_version());
+      return new TGetCatalogDeltaResponse(catalogDelta);
+    });
+  }
+
   /**
    * Executes the given DDL request and returns the result.
    */
   public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException, 
TException {
-    long start = System.currentTimeMillis();
     TDdlExecRequest params = new TDdlExecRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDdlExecReq);
     String shortDesc = CatalogOpUtil.getShortDescForExecDdl(params);
-    LOG.info("execDdl request: " + shortDesc);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
-      TSerializer serializer = new TSerializer(protocolFactory_);
-      byte[] res = 
serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
-      JniUtil.logResponse(res.length, start, params, "execDdl");
-      long duration = System.currentTimeMillis() - start;
-      LOG.info("finished execDdl request: {}. Time spent: {}",
-          shortDesc, PrintUtils.printTimeMs(duration));
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in execDdl for {}. Time spent: {}.",
-          shortDesc, PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+
+    return execAndSerialize(
+        "execDdl", shortDesc, () -> catalogOpExecutor_.execDdlRequest(params));
   }
 
   /**
@@ -276,297 +278,237 @@ public class JniCatalog {
    */
   public byte[] resetMetadata(byte[] thriftResetMetadataReq)
       throws ImpalaException, TException {
-    long start = System.currentTimeMillis();
     TResetMetadataRequest req = new TResetMetadataRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    catalogOperationUsage.increment(req);
+    catalogOperationUsage_.increment(req);
     String shortDesc = CatalogOpUtil.getShortDescForReset(req);
-    LOG.info("resetMetadata request: " + shortDesc);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
-      byte[] res = 
serializer.serialize(catalogOpExecutor_.execResetMetadata(req));
-      JniUtil.logResponse(res.length, start, req, "resetMetadata");
-      long duration = System.currentTimeMillis() - start;
-      LOG.info("finished resetMetadata request: {}. Time spent: {}",
-          shortDesc, PrintUtils.printTimeMs(duration));
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in resetMetadata for {}. Time spent: {}.",
-          shortDesc, PrintUtils.printTimeMs(duration));
-      throw e;
-    } finally {
-      catalogOperationUsage.decrement(req);
-    }
+
+    return execAndSerialize("resetMetadata", shortDesc,
+        () -> catalogOpExecutor_.execResetMetadata(req),
+        () -> catalogOperationUsage_.decrement(req));
   }
 
   /**
-   * Returns a list of databases matching an optional pattern.
-   * The argument is a serialized TGetDbParams object.
-   * The return type is a serialized TGetDbResult object.
+   * Returns a list of databases matching an optional pattern. The argument is 
a
+   * serialized TGetDbParams object. The return type is a serialized 
TGetDbResult object.
    */
-  public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
+  public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException, 
TException {
     TGetDbsParams params = new TGetDbsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
-    List<Db> dbs = catalog_.getDbs(PatternMatcher.MATCHER_MATCH_ALL);
     TGetDbsResult result = new TGetDbsResult();
-    List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
-    for (FeDb db: dbs) tDbs.add(db.toThrift());
-    result.setDbs(tDbs);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(result);
-    JniUtil.logResponse(res.length, start, params, "getDbs");
-    return res;
+    String shortDesc = "Getting databases with pattern: " + 
params.getPattern();
+
+    return execAndSerialize("getDbs", shortDesc, () -> {
+      List<Db> dbs = catalog_.getDbs(PatternMatcher.MATCHER_MATCH_ALL);
+      List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
+      for (FeDb db : dbs) {
+        tDbs.add(db.toThrift());
+      }
+      result.setDbs(tDbs);
+      return result;
+    });
   }
 
   /**
-   * Returns a list of table names matching an optional pattern.
-   * The argument is a serialized TGetTablesParams object.
-   * The return type is a serialized TGetTablesResult object.
+   * Returns a list of table names matching an optional pattern. The argument 
is a
+   * serialized TGetTablesParams object. The return type is a serialized 
TGetTablesResult
+   * object.
    */
-  public byte[] getTableNames(byte[] thriftGetTablesParams) throws 
ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
+  public byte[] getTableNames(byte[] thriftGetTablesParams)
+      throws ImpalaException, TException {
     TGetTablesParams params = new TGetTablesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
-    List<String> tables = catalog_.getTableNames(params.db,
-        PatternMatcher.createHivePatternMatcher(params.pattern));
     TGetTablesResult result = new TGetTablesResult();
-    result.setTables(tables);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(result);
-    JniUtil.logResponse(res.length, start, params, "getTableNames");
-    return res;
+    String shortDesc =
+        String.format("Getting table names with parameters: database: %s, 
pattern: %s ",
+            params.getDb(), params.getPattern());
+
+    return execAndSerialize("getTableNames", shortDesc, () -> {
+      List<String> tables = catalog_.getTableNames(
+          params.getDb(), 
PatternMatcher.createHivePatternMatcher(params.getPattern()));
+      result.setTables(tables);
+      return result;
+    });
   }
 
   /**
    * Returns the collected metrics of a table.
    */
-  public String getTableMetrics(byte[] getTableMetricsParams) throws 
ImpalaException {
-    long start = System.currentTimeMillis();
+  public String getTableMetrics(byte[] getTableMetricsParams)
+      throws ImpalaException, TException {
     TGetTableMetricsParams params = new TGetTableMetricsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, getTableMetricsParams);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
-        "getTableMetrics " + params.table_name)) {
+    String shortDesc = "Getting table metrics for " + params.getTable_name();
+
+    return execOp("getTableMetrics", shortDesc, () -> {
       String res = catalog_.getTableMetrics(params.table_name);
-      JniUtil.logResponse(res.length(), start, params, "getTableMetrics");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in getTableMetrics {}. Time spent: {}.", 
params.table_name,
-          PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+      return Pair.create(res, (long) res.length());
+    }, params);
   }
 
   /**
    * Gets the thrift representation of a catalog object.
    */
-  public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
+  public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException, 
TException {
     TCatalogObject objectDesc = new TCatalogObject();
     JniUtil.deserializeThrift(protocolFactory_, objectDesc, thriftParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    String shortDesc = "getting thrift catalog object of "
-        + Catalog.toCatalogObjectKey(objectDesc);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
-      byte[] res = 
serializer.serialize(catalog_.getTCatalogObject(objectDesc));
-      JniUtil.logResponse(res.length, start, objectDesc, "getCatalogObject");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in {}. Time spent: {}.", shortDesc,
-          PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+    String shortDesc =
+        "Getting thrift catalog object of " + 
Catalog.toCatalogObjectKey(objectDesc);
+
+    return execAndSerialize(
+        "getCatalogObject", shortDesc, () -> 
catalog_.getTCatalogObject(objectDesc));
   }
 
   /**
    * Gets the json string of a catalog object. It can only be used in showing 
debug
    * messages and can't be deserialized to a thrift object.
    */
-  public String getJsonCatalogObject(byte[] thriftParams) throws 
ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
+  public String getJsonCatalogObject(byte[] thriftParams)
+      throws ImpalaException, TException {
     TCatalogObject objectDesc = new TCatalogObject();
     JniUtil.deserializeThrift(protocolFactory_, objectDesc, thriftParams);
     TSerializer jsonSerializer = new TSerializer(new 
TSimpleJSONProtocol.Factory());
-    String shortDesc = "getting json catalog object of "
-        + Catalog.toCatalogObjectKey(objectDesc);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+    String shortDesc =
+        "Getting JSON catalog object of " + 
Catalog.toCatalogObjectKey(objectDesc);
+
+    return execOp("getJsonCatalogObject", shortDesc, () -> {
       String res = 
jsonSerializer.toString(catalog_.getTCatalogObject(objectDesc));
-      JniUtil.logResponse(res.length(), start, objectDesc, 
"getJsonCatalogObject");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in {}. Time spent: {}.", shortDesc,
-          PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+      return Pair.create(res, (long) res.length());
+    }, objectDesc);
   }
 
-  public byte[] getPartialCatalogObject(byte[] thriftParams) throws 
ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
-    TGetPartialCatalogObjectRequest req =
-        new TGetPartialCatalogObjectRequest();
+  public byte[] getPartialCatalogObject(byte[] thriftParams)
+      throws ImpalaException, TException {
+    TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    try {
-      byte[] res = serializer.serialize(catalog_.getPartialCatalogObject(req));
-      JniUtil.logResponse(res.length, start, req, "getPartialCatalogObject");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in getting PartialCatalogObject of {}. Time spent: {}.",
-          Catalog.toCatalogObjectKey(req.object_desc), 
PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+    String shortDesc = "Getting partial catalog object of "
+        + Catalog.toCatalogObjectKey(req.getObject_desc());
+
+    return execAndSerialize("getPartialCatalogObject", shortDesc,
+        () -> catalog_.getPartialCatalogObject(req));
   }
 
   /**
    * See comment in CatalogServiceCatalog.
    */
-  public byte[] getFunctions(byte[] thriftParams) throws ImpalaException,
-      TException {
-    long start = System.currentTimeMillis();
+  public byte[] getFunctions(byte[] thriftParams) throws ImpalaException, 
TException {
     TGetFunctionsRequest request = new TGetFunctionsRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    if (!request.isSetDb_name()) {
-      throw new InternalException("Database name must be set in call to " +
-          "getFunctions()");
-    }
-
-    // Get all the functions and convert them to their Thrift representation.
-    List<Function> fns = catalog_.getFunctions(request.getDb_name());
     TGetFunctionsResponse response = new TGetFunctionsResponse();
-    response.setFunctions(new ArrayList<TFunction>(fns.size()));
-    for (Function fn: fns) {
-      response.addToFunctions(fn.toThrift());
+    if (!request.isSetDb_name()) {
+      throw new InternalException("Database name must be set in call to 
getFunctions()");
     }
 
-    byte[] res = serializer.serialize(response);
-    JniUtil.logResponse(res.length, start, request, "getFunctions");
-    return res;
+    String shortDesc = "Getting functions for " + request.getDb_name();
+    return execAndSerialize("getFunctions", shortDesc, () -> {
+      // Get all the functions and convert them to their Thrift representation.
+      List<Function> fns = catalog_.getFunctions(request.getDb_name());
+      response.setFunctions(new ArrayList<>(fns.size()));
+      for (Function fn : fns) {
+        response.addToFunctions(fn.toThrift());
+      }
+      return response;
+    });
   }
 
-  public void prioritizeLoad(byte[] thriftLoadReq) throws ImpalaException {
-    long start = System.currentTimeMillis();
+  public void prioritizeLoad(byte[] thriftLoadReq) throws ImpalaException, 
TException {
     TPrioritizeLoadRequest request = new TPrioritizeLoadRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadReq);
-    catalog_.prioritizeLoad(request.getObject_descs());
-    JniUtil.logResponse(start, request, "prioritizeLoad");
+
+    String shortDesc = "Prioritize load on table(s): "
+        + request.getObject_descs()
+              .stream()
+              .map(TCatalogObject::getTable)
+              .map(t -> fullyQualifiedTableName(t.getDb_name(), 
t.getTbl_name()))
+              .collect(Collectors.joining(", "));
+    execOp("prioritizeLoad", shortDesc, () -> {
+      catalog_.prioritizeLoad(request.getObject_descs());
+      return Pair.create(null, null);
+    }, request);
   }
 
   public byte[] getPartitionStats(byte[] thriftParams)
       throws ImpalaException, TException {
-    long start = System.currentTimeMillis();
     TGetPartitionStatsRequest request = new TGetPartitionStatsRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     TGetPartitionStatsResponse response = new TGetPartitionStatsResponse();
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
-        "Getting partition stats of " + request.table_name)) {
-      response.setPartition_stats(catalog_.getPartitionStats(request));
-    } catch (CatalogException e) {
-      response.setStatus(
-          new TStatus(TErrorCode.INTERNAL_ERROR, 
ImmutableList.of(e.getMessage())));
-    }
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
-        "Serializing partition stats of " + request.table_name)) {
-      byte[] res = serializer.serialize(response);
-      JniUtil.logResponse(res.length, start, request, "getPartitionStats");
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in serializing partition stats of {}. Time spent in 
method: {}.",
-          request.table_name, PrintUtils.printTimeMs(duration));
-      throw e;
-    }
+    String shortDescGet = "Getting partition stats of " + 
request.getTable_name();
+    String shortDescSer = "Serializing partition stats of " + 
request.getTable_name();
+
+    return execAndSerialize("getPartitionStats", shortDescSer, () -> {
+      try (ThreadNameAnnotator ignored = new 
ThreadNameAnnotator(shortDescGet)) {
+        response.setPartition_stats(catalog_.getPartitionStats(request));
+      } catch (CatalogException e) {
+        response.setStatus(
+            new TStatus(TErrorCode.INTERNAL_ERROR, 
ImmutableList.of(e.getMessage())));
+      }
+      return response;
+    });
   }
 
   /**
-   * Process any updates to the metastore required after a query executes.
-   * The argument is a serialized TCatalogUpdate.
+   * Process any updates to the metastore required after a query executes. The 
argument is
+   * a serialized TCatalogUpdate.
    */
-  public byte[] updateCatalog(byte[] thriftUpdateCatalog) throws 
ImpalaException,
-      TException  {
-    long start = System.currentTimeMillis();
+  public byte[] updateCatalog(byte[] thriftUpdateCatalog)
+      throws ImpalaException, TException {
     TUpdateCatalogRequest request = new TUpdateCatalogRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftUpdateCatalog);
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    catalogOperationUsage.increment(request);
-    String shortDesc = String.format("updateCatalog for %s.%s",
-        request.db_name, request.target_table);
-    LOG.info(shortDesc);
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
-      byte[] res = 
serializer.serialize(catalogOpExecutor_.updateCatalog(request));
-      JniUtil.logResponse(res.length, start, request, "updateCatalog");
-      long duration = System.currentTimeMillis() - start;
-      LOG.info("finished {}. Time spent: {}", shortDesc,
-          PrintUtils.printTimeMs(duration));
-      return res;
-    } catch (Throwable e) {
-      long duration = System.currentTimeMillis() - start;
-      LOG.error("Error in {}. Time spent: {}.", shortDesc,
-          PrintUtils.printTimeMs(duration));
-      throw e;
-    } finally {
-      catalogOperationUsage.decrement(request);
-    }
+    catalogOperationUsage_.increment(request);
+    String shortDesc = "Update catalog for "
+        + fullyQualifiedTableName(request.getDb_name(), 
request.getTarget_table());
+
+    return execAndSerialize("updateCatalog", shortDesc,
+        () -> catalogOpExecutor_.updateCatalog(request),
+        () -> catalogOperationUsage_.decrement(request));
   }
 
   /**
    * Returns information about the current catalog usage.
    */
   public byte[] getCatalogUsage() throws ImpalaException, TException {
-    long start = System.currentTimeMillis();
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(catalog_.getCatalogUsage());
-    JniUtil.logResponse(res.length, start, /*thriftReq*/null, 
"getCatalogUsage");
-    return res;
+    String shortDesc = "Getting catalog usage";
+    return execAndSerialize("getCatalogUsage", shortDesc, 
catalog_::getCatalogUsage);
   }
 
   /**
    * Returns information about the current catalog operation metrics.
    */
   public byte[] getOperationUsage() throws ImpalaException, TException {
-    long start = System.currentTimeMillis();
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(catalog_.getOperationUsage());
-    JniUtil.logResponse(res.length, start, /*thriftReq*/null, 
"getOperationUsage");
-    return res;
+    String shortDesc = "Getting operation usage";
+    return execAndSerialize("getOperationUsage", shortDesc, 
catalog_::getOperationUsage);
   }
 
-  public byte[] getEventProcessorSummary() throws TException {
-    long start = System.currentTimeMillis();
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(catalog_.getEventProcessorSummary());
-    JniUtil.logResponse(res.length, start, /*thriftReq*/null, 
"getEventProcessorSummary");
-    return res;
+  public byte[] getEventProcessorSummary() throws ImpalaException, TException {
+    String shortDesc = "Getting event processor summary";
+    return execAndSerialize(
+        "getEventProcessorSummary", shortDesc, 
catalog_::getEventProcessorSummary);
   }
 
-  public void updateTableUsage(byte[] req) throws ImpalaException {
-    long start = System.currentTimeMillis();
+  public void updateTableUsage(byte[] req) throws ImpalaException, TException {
     TUpdateTableUsageRequest thriftReq = new TUpdateTableUsageRequest();
     JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
-    catalog_.updateTableUsage(thriftReq);
-    JniUtil.logResponse(start, thriftReq, "updateTableUsage");
+
+    String shortDesc = "Update table usage(s):"
+        + thriftReq.getUsages()
+              .stream()
+              .map(TTableUsage::getTable_name)
+              .map(TableName::thriftToString)
+              .collect(Collectors.joining(", "));
+    execOp("updateTableUsage", shortDesc, () -> {
+      catalog_.updateTableUsage(thriftReq);
+      return Pair.create(null, null);
+    }, thriftReq);
   }
 
   public byte[] getCatalogServerMetrics() throws ImpalaException, TException {
-    long start = System.currentTimeMillis();
     TGetCatalogServerMetricsResponse response = new 
TGetCatalogServerMetricsResponse();
-    response.setCatalog_partial_fetch_rpc_queue_len(
-        catalog_.getPartialFetchRpcQueueLength());
-    response.setEvent_metrics(catalog_.getEventProcessorMetrics());
-    TSerializer serializer = new TSerializer(protocolFactory_);
-    byte[] res = serializer.serialize(response);
-    JniUtil.logResponse(res.length, start, /*thriftReq*/null, 
"getCatalogServerMetrics");
-    return res;
+    String shortDesc = "Get catalog server metrics";
+    return execAndSerialize("getCatalogServerMetrics", shortDesc, () -> {
+      response.setCatalog_partial_fetch_rpc_queue_len(
+          catalog_.getPartialFetchRpcQueueLength());
+      response.setEvent_metrics(catalog_.getEventProcessorMetrics());
+      return response;
+    });
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalogOp.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalogOp.java
new file mode 100644
index 000000000..6db6f8124
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalogOp.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.JniUtil;
+import org.apache.impala.common.JniUtil.OperationLog;
+import org.apache.impala.common.Pair;
+import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+public class JniCatalogOp {
+  public interface JniCatalogOpCallable<V> {
+    V call() throws ImpalaException, TException;
+  }
+
+  /**
+   * Executes @operand in an annotated thread environment for better 
supportability
+   *
+   * @param methodName       The name of the method which is wrapped in {@code 
operand}
+   * @param shortDescription The description which will get applied as the 
thread's name
+   * @param operand          The callable that does the actual operation. Type 
parameter
+   *                         is {@code Pair<RESULT,Long>} thus the length of 
the resulting
+   *                         object can be present regardless of the type of 
the result.
+   * @param requestParameter Thrift request parameter which gets processed in
+   *                         {@code operand}
+   * @param finalClause      Finalize clause which will be called in a final 
block after
+   *                         {@code operand}
+   * @param <RESULT>         Type parameter for the result
+   * @param <PARAMETER>      Type parameter for {@code requestParameter}
+   */
+  public static <RESULT, PARAMETER extends TBase<?, ?>> RESULT execOp(String 
methodName,
+      String shortDescription, JniCatalogOpCallable<Pair<RESULT, Long>> 
operand,
+      PARAMETER requestParameter, Runnable finalClause)
+      throws ImpalaException, TException {
+    Preconditions.checkNotNull(operand);
+    Preconditions.checkNotNull(finalClause);
+
+    OperationLog operationLog = JniUtil.logOperation(methodName, 
shortDescription);
+
+    try (ThreadNameAnnotator ignored = new 
ThreadNameAnnotator(shortDescription)) {
+      Pair<RESULT, Long> result = operand.call();
+
+      long size = 0;
+      if (result.second != null) {
+        size = result.second;
+      }
+
+      operationLog.logResponse(size, requestParameter);
+      operationLog.logFinish();
+
+      return result.first;
+    } catch (Throwable e) {
+      operationLog.logError();
+      throw e;
+    } finally {
+      finalClause.run();
+    }
+  }
+
+  public static <RESULT, PARAMETER extends TBase<?, ?>> RESULT execOp(String 
methodName,
+      String shortDescription, JniCatalogOpCallable<Pair<RESULT, Long>> 
operand,
+      PARAMETER parameter) throws ImpalaException, TException {
+    return execOp(methodName, shortDescription, operand, parameter, () -> {});
+  }
+
+  public static byte[] execAndSerialize(String methodName, String 
shortDescription,
+      JniCatalogOpCallable<TBase<?, ?>> operand, TSerializer serializer,
+      Runnable finalClause) throws ImpalaException, TException {
+    return execOp(methodName, shortDescription, () -> {
+      TBase<?, ?> response = operand.call();
+      byte[] result = serializer.serialize(response);
+      return Pair.create(result, (long) result.length);
+    }, null, finalClause);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/service/JniCatalogOpTest.java 
b/fe/src/test/java/org/apache/impala/service/JniCatalogOpTest.java
new file mode 100644
index 000000000..76e9ae513
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/service/JniCatalogOpTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import static org.apache.impala.service.JniCatalogOp.execOp;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class JniCatalogOpTest extends FrontendTestBase {
+  public static final String RESULT_STRING = "result";
+
+  @Test
+  public void testExecOpNullParameter() throws TException, ImpalaException {
+    String result;
+    result = execOp(
+        "methodName", "shortDescription", () -> new Pair<>(RESULT_STRING, 1L), 
null);
+
+    assertEquals(RESULT_STRING, result);
+  }
+
+  @Test
+  public void testExecOp() throws TException, ImpalaException {
+    String result;
+    result = execOp("methodName", "shortDescription",
+        ()
+            -> new Pair<>(RESULT_STRING, 1L),
+        new TCatalogObject(TCatalogObjectType.TABLE, 0L));
+    assertEquals(RESULT_STRING, result);
+  }
+
+  @Test
+  public void testExecOpNullPairAndParameter() throws TException, 
ImpalaException {
+    String result;
+    result = execOp("methodName", "shortDescription", () -> new Pair<>(null, 
null), null);
+    assertNull(result);
+  }
+
+  @Test
+  public void testExecOpNullStrings() throws TException, ImpalaException {
+    String result;
+    result = execOp(null, null, () -> new Pair<>(RESULT_STRING, 1L), null);
+    assertEquals(RESULT_STRING, result);
+  }
+}
\ No newline at end of file


Reply via email to