[ 
https://issues.apache.org/jira/browse/HIVE-25429?focusedWorklogId=635811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-635811
 ]

ASF GitHub Bot logged work on HIVE-25429:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Aug/21 09:07
            Start Date: 09/Aug/21 09:07
    Worklog Time Spent: 10m 
      Work Description: deniskuzZ commented on a change in pull request #2563:
URL: https://github.com/apache/hive/pull/2563#discussion_r685006096



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) 
throws Exception {
 
   private void configure(HiveConf conf) throws Exception {
     acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    if (acidMetricsExtEnabled) {
 
-    deltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
-    obsoleteDeltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
-    initCachesForMetrics(conf);
-    initObjectsForMetrics();
+      initCachesForMetrics(conf);
+      initObjectsForMetrics();
 
-    long reportingInterval = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 
TimeUnit.SECONDS);
+      long reportingInterval =
+          HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
 
-    ThreadFactory threadFactory =
-      new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("DeltaFilesMetricReporter %d")
-        .build();
-    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(
-      new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+      ThreadFactory threadFactory =
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter 
%d").build();
+      executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+      executorService.scheduleAtFixedRate(new ReportingTask(), 0, 
reportingInterval, TimeUnit.SECONDS);
 
-    LOG.info("Started DeltaFilesMetricReporter thread");
+      LOG.info("Started DeltaFilesMetricReporter thread");
+    }
   }
 
   public void submit(TezCounters counters) {
     if (acidMetricsExtEnabled) {
-      updateMetrics(NUM_OBSOLETE_DELTAS,
-        obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
-        counters);
-      updateMetrics(NUM_DELTAS,
-        deltaCache, deltaTopN, deltasThreshold,
-        counters);
-      updateMetrics(NUM_SMALL_DELTAS,
-        smallDeltaCache, smallDeltaTopN, deltasThreshold,
-        counters);
+      updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, 
obsoleteDeltaTopN, counters);
+      updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+      updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, 
counters);
     }
   }
 
+  /**
+   * Copy counters to caches.
+   */
   private void updateMetrics(DeltaFilesMetricType metric, Cache<String, 
Integer> cache, Queue<Pair<String, Integer>> topN,
-        int threshold, TezCounters counters) {
-    counters.getGroup(metric.value).forEach(counter -> {
-      Integer prev = cache.getIfPresent(counter.getName());
-      if (prev != null && prev != counter.getValue()) {
-        cache.invalidate(counter.getName());
+      TezCounters counters) {
+    try {
+      CounterGroup group = counters.getGroup(metric.value);
+      // if the group is empty, clear the cache
+      if (group.size() == 0) {
+        cache.invalidateAll();

Review comment:
       Are you cleaning the whole cache when tez task doesn't have any deltas 
to report?  What about data from other tasks that touched diff 
tables/partitions.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) 
throws Exception {
 
   private void configure(HiveConf conf) throws Exception {
     acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    if (acidMetricsExtEnabled) {

Review comment:
       should we even create an instance of DeltaFilesMetricReporter when  
MetastoreConf.ConfVars.METRICS_ENABLED || METASTORE_ACIDMETRICS_EXT_ON are set 
to false?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) 
throws Exception {
 
   private void configure(HiveConf conf) throws Exception {
     acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    if (acidMetricsExtEnabled) {
 
-    deltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
-    obsoleteDeltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
-    initCachesForMetrics(conf);
-    initObjectsForMetrics();
+      initCachesForMetrics(conf);
+      initObjectsForMetrics();
 
-    long reportingInterval = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 
TimeUnit.SECONDS);
+      long reportingInterval =
+          HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
 
-    ThreadFactory threadFactory =
-      new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("DeltaFilesMetricReporter %d")
-        .build();
-    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(
-      new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+      ThreadFactory threadFactory =
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter 
%d").build();
+      executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+      executorService.scheduleAtFixedRate(new ReportingTask(), 0, 
reportingInterval, TimeUnit.SECONDS);
 
-    LOG.info("Started DeltaFilesMetricReporter thread");
+      LOG.info("Started DeltaFilesMetricReporter thread");
+    }
   }
 
   public void submit(TezCounters counters) {
     if (acidMetricsExtEnabled) {
-      updateMetrics(NUM_OBSOLETE_DELTAS,
-        obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
-        counters);
-      updateMetrics(NUM_DELTAS,
-        deltaCache, deltaTopN, deltasThreshold,
-        counters);
-      updateMetrics(NUM_SMALL_DELTAS,
-        smallDeltaCache, smallDeltaTopN, deltasThreshold,
-        counters);
+      updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, 
obsoleteDeltaTopN, counters);
+      updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+      updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, 
counters);
     }
   }
 
+  /**
+   * Copy counters to caches.
+   */
   private void updateMetrics(DeltaFilesMetricType metric, Cache<String, 
Integer> cache, Queue<Pair<String, Integer>> topN,
-        int threshold, TezCounters counters) {
-    counters.getGroup(metric.value).forEach(counter -> {
-      Integer prev = cache.getIfPresent(counter.getName());
-      if (prev != null && prev != counter.getValue()) {
-        cache.invalidate(counter.getName());
+      TezCounters counters) {
+    try {
+      CounterGroup group = counters.getGroup(metric.value);
+      // if the group is empty, clear the cache
+      if (group.size() == 0) {
+        cache.invalidateAll();
+      } else {
+        // if there is no counter corresponding to a cache entry, remove from 
cache
+        ConcurrentMap<String, Integer> cacheMap = cache.asMap();
+        cacheMap.keySet().stream().filter(key -> 
counters.findCounter(group.getName(), key).getValue() == 0)
+            .forEach(cache::invalidate);
       }
-      if (counter.getValue() > threshold) {
-        if (topN.size() == maxCacheSize) {
-          Pair<String, Integer> lowest = topN.peek();
-          if (lowest != null && counter.getValue() > lowest.getValue()) {
-            cache.invalidate(lowest.getKey());
-          }
-        }
-        if (topN.size() < maxCacheSize) {
-          topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
-          cache.put(counter.getName(), (int) counter.getValue());
+      // update existing cache entries or add new entries
+      for (TezCounter counter : group) {
+        Integer prev = cache.getIfPresent(counter.getName());
+        if (prev != null && prev != counter.getValue()) {
+          cache.invalidate(counter.getName());
         }
+        topN.add(Pair.of(counter.getName(), (int) counter.getValue()));

Review comment:
       Would it work as topN cache? It's gonna evict entry when the limit is 
exceeded, even if that entry has a higher value than inserted 
(least-recently-used eviction)

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) 
throws Exception {
 
   private void configure(HiveConf conf) throws Exception {
     acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    if (acidMetricsExtEnabled) {
 
-    deltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
-    obsoleteDeltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
-    initCachesForMetrics(conf);
-    initObjectsForMetrics();
+      initCachesForMetrics(conf);
+      initObjectsForMetrics();
 
-    long reportingInterval = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 
TimeUnit.SECONDS);
+      long reportingInterval =
+          HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
 
-    ThreadFactory threadFactory =
-      new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("DeltaFilesMetricReporter %d")
-        .build();
-    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(
-      new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+      ThreadFactory threadFactory =
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter 
%d").build();
+      executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+      executorService.scheduleAtFixedRate(new ReportingTask(), 0, 
reportingInterval, TimeUnit.SECONDS);
 
-    LOG.info("Started DeltaFilesMetricReporter thread");
+      LOG.info("Started DeltaFilesMetricReporter thread");
+    }
   }
 
   public void submit(TezCounters counters) {
     if (acidMetricsExtEnabled) {
-      updateMetrics(NUM_OBSOLETE_DELTAS,
-        obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
-        counters);
-      updateMetrics(NUM_DELTAS,
-        deltaCache, deltaTopN, deltasThreshold,
-        counters);
-      updateMetrics(NUM_SMALL_DELTAS,
-        smallDeltaCache, smallDeltaTopN, deltasThreshold,
-        counters);
+      updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, 
obsoleteDeltaTopN, counters);
+      updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+      updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, 
counters);
     }
   }
 
+  /**
+   * Copy counters to caches.
+   */
   private void updateMetrics(DeltaFilesMetricType metric, Cache<String, 
Integer> cache, Queue<Pair<String, Integer>> topN,
-        int threshold, TezCounters counters) {
-    counters.getGroup(metric.value).forEach(counter -> {
-      Integer prev = cache.getIfPresent(counter.getName());
-      if (prev != null && prev != counter.getValue()) {
-        cache.invalidate(counter.getName());
+      TezCounters counters) {
+    try {
+      CounterGroup group = counters.getGroup(metric.value);
+      // if the group is empty, clear the cache
+      if (group.size() == 0) {
+        cache.invalidateAll();
+      } else {
+        // if there is no counter corresponding to a cache entry, remove from 
cache
+        ConcurrentMap<String, Integer> cacheMap = cache.asMap();
+        cacheMap.keySet().stream().filter(key -> 
counters.findCounter(group.getName(), key).getValue() == 0)
+            .forEach(cache::invalidate);
       }
-      if (counter.getValue() > threshold) {
-        if (topN.size() == maxCacheSize) {
-          Pair<String, Integer> lowest = topN.peek();
-          if (lowest != null && counter.getValue() > lowest.getValue()) {
-            cache.invalidate(lowest.getKey());
-          }
-        }
-        if (topN.size() < maxCacheSize) {
-          topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
-          cache.put(counter.getName(), (int) counter.getValue());
+      // update existing cache entries or add new entries
+      for (TezCounter counter : group) {
+        Integer prev = cache.getIfPresent(counter.getName());
+        if (prev != null && prev != counter.getValue()) {
+          cache.invalidate(counter.getName());
         }
+        topN.add(Pair.of(counter.getName(), (int) counter.getValue()));
+        cache.put(counter.getName(), (int) counter.getValue());
       }
-    });
+    } catch (Exception e) {
+      LOG.warn("Caught exception while trying to update delta metrics cache. 
Invalidating cache", e);
+      try {
+        cache.invalidateAll();
+      } catch (Exception x) {
+        LOG.warn("Caught exception while trying to invalidate cache", x);
+      }
+    }
   }
 
-  public static void mergeDeltaFilesStats(AcidDirectory dir, long 
checkThresholdInSec,
-        float deltaPctThreshold, EnumMap<DeltaFilesMetricType, Map<String, 
Integer>> deltaFilesStats,
-      Configuration conf) throws IOException {
+  /**
+   * Update EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> 
deltaFilesStats with {@link AcidDirectory}
+   * contents
+   */
+  public static void mergeDeltaFilesStats(AcidDirectory dir, long 
checkThresholdInSec, float deltaPctThreshold,
+      EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> 
deltaFilesStats, Configuration conf) throws IOException {
+
+    int deltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);

Review comment:
       it's inefficient to access conf values for every table/partitions

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -219,8 +226,33 @@ public static void mergeDeltaFilesStats(AcidDirectory dir, 
long checkThresholdIn
     logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, 
numSmallDeltas);
 
     String path = getRelPath(dir);
-    newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas)
-      .forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new 
HashMap<>()).put(path, cnt));
+
+    filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, 
deltaFilesStats, path, maxCacheSize);
+    filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, 
obsoleteDeltasThreshold, deltaFilesStats,
+        path, maxCacheSize);
+    filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, 
deltasThreshold, deltaFilesStats,
+        path, maxCacheSize);
+  }
+
+  /**
+   * Add partition and delta count to deltaFilesStats if the delta count is 
over the recording threshold and it is in
+   * the top {@link HiveConf.ConfVars#HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE} 
deltas.
+   */
+  private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type, 
int deltaCount, int deltasThreshold,
+      EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> 
deltaFilesStats, String path, int maxCacheSize) {
+    if (deltaCount > deltasThreshold) {
+      Queue<Pair<String,Integer>> pairQueue = deltaFilesStats.get(type);
+      if (pairQueue != null && pairQueue.size() == maxCacheSize) {
+        Pair<String, Integer> lowest = pairQueue.peek();
+        if (lowest != null && deltaCount > lowest.getValue()) {
+          pairQueue.poll();
+        }
+      }
+      if (pairQueue == null || pairQueue.size() < maxCacheSize) {
+        deltaFilesStats.computeIfAbsent(type,
+            v -> (new PriorityBlockingQueue<>(maxCacheSize, 
getComparator()))).add(Pair.of(path, deltaCount));

Review comment:
       Why do you need a blocking queue here? Would it be accessed by multiple 
threads?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) 
throws Exception {
 
   private void configure(HiveConf conf) throws Exception {
     acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    if (acidMetricsExtEnabled) {
 
-    deltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
-    obsoleteDeltasThreshold = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
-
-    initCachesForMetrics(conf);
-    initObjectsForMetrics();
+      initCachesForMetrics(conf);
+      initObjectsForMetrics();
 
-    long reportingInterval = HiveConf.getTimeVar(conf,
-      HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 
TimeUnit.SECONDS);
+      long reportingInterval =
+          HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
 
-    ThreadFactory threadFactory =
-      new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("DeltaFilesMetricReporter %d")
-        .build();
-    executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(
-      new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+      ThreadFactory threadFactory =
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter 
%d").build();
+      executorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+      executorService.scheduleAtFixedRate(new ReportingTask(), 0, 
reportingInterval, TimeUnit.SECONDS);
 
-    LOG.info("Started DeltaFilesMetricReporter thread");
+      LOG.info("Started DeltaFilesMetricReporter thread");
+    }
   }
 
   public void submit(TezCounters counters) {
     if (acidMetricsExtEnabled) {
-      updateMetrics(NUM_OBSOLETE_DELTAS,
-        obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold,
-        counters);
-      updateMetrics(NUM_DELTAS,
-        deltaCache, deltaTopN, deltasThreshold,
-        counters);
-      updateMetrics(NUM_SMALL_DELTAS,
-        smallDeltaCache, smallDeltaTopN, deltasThreshold,
-        counters);
+      updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, 
obsoleteDeltaTopN, counters);
+      updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters);
+      updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, 
counters);
     }
   }
 
+  /**
+   * Copy counters to caches.
+   */
   private void updateMetrics(DeltaFilesMetricType metric, Cache<String, 
Integer> cache, Queue<Pair<String, Integer>> topN,
-        int threshold, TezCounters counters) {
-    counters.getGroup(metric.value).forEach(counter -> {
-      Integer prev = cache.getIfPresent(counter.getName());
-      if (prev != null && prev != counter.getValue()) {
-        cache.invalidate(counter.getName());
+      TezCounters counters) {
+    try {
+      CounterGroup group = counters.getGroup(metric.value);
+      // if the group is empty, clear the cache
+      if (group.size() == 0) {
+        cache.invalidateAll();
+      } else {
+        // if there is no counter corresponding to a cache entry, remove from 
cache
+        ConcurrentMap<String, Integer> cacheMap = cache.asMap();

Review comment:
       This is supposed to be a global cache. I am not sure I understand it 
right. But looks like if some part is not present in tez counters it would be 
removed from global cache.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -267,32 +299,26 @@ private static String getRelPath(AcidUtils.Directory 
directory) {
       directory.getPath().getName();
   }
 
-  private static EnumMap<DeltaFilesMetricType, Integer> newDeltaFilesStats(int 
numObsoleteDeltas, int numDeltas, int numSmallDeltas) {
-    return new EnumMap<DeltaFilesMetricType, 
Integer>(DeltaFilesMetricType.class) {{
-      put(NUM_OBSOLETE_DELTAS, numObsoleteDeltas);
-      put(NUM_DELTAS, numDeltas);
-      put(NUM_SMALL_DELTAS, numSmallDeltas);
-    }};
-  }
-
   public static void createCountersForAcidMetrics(TezCounters tezCounters, 
JobConf jobConf) {
-    if (HiveConf.getBoolVar(jobConf, 
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
+    try {
+      if (HiveConf.getBoolVar(jobConf, 
HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf
+          .getBoolVar(jobConf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
 
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> jobConf.get(type.name()) != null)
-        .forEach(type ->
-            
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())).forEach(
-              (path, cnt) -> tezCounters.findCounter(type.value, 
path).setValue(Long.parseLong(cnt))
-            )
-        );
+        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> 
jobConf.get(type.name()) != null).forEach(
+            type -> 
Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name()))
+                .forEach((path, cnt) -> tezCounters.findCounter(type.value, 
path).setValue(Long.parseLong(cnt))));
+      }
+    } catch (Exception e) {

Review comment:
       I don't see where a checked exception could be thrown here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 635811)
    Time Spent: 20m  (was: 10m)

> Delta metrics collection may cause number of tez counters to exceed 
> tez.counters.max limit
> ------------------------------------------------------------------------------------------
>
>                 Key: HIVE-25429
>                 URL: https://issues.apache.org/jira/browse/HIVE-25429
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Hive
>    Affects Versions: 4.0.0
>            Reporter: Karen Coppage
>            Assignee: Karen Coppage
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> There's a limit to the number of tez counters allowed (tez.counters.max). 
> Delta metrics collection (i.e. DeltaFileMetricsReporter) was creating 3 
> counters for each partition touched by a given query, which can result in a 
> huge number of counters, which is unnecessary because we're only interested 
> in n number of partitions with the most deltas. This change limits the number 
> of counters created to hive.txn.acid.metrics.max.cache.size*3.
> Also when tez.counters.max is reached a LimitExceededException is thrown but 
> isn't caught on the Hive side and causes the query to fail. We should catch 
> this and skip delta metrics collection in this case.
> Also make sure that metrics are only collected if 
> hive.metastore.acidmetrics.ext.on=true



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to