[ https://issues.apache.org/jira/browse/HIVE-25429?focusedWorklogId=635811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-635811 ]
ASF GitHub Bot logged work on HIVE-25429: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/Aug/21 09:07 Start Date: 09/Aug/21 09:07 Worklog Time Spent: 10m Work Description: deniskuzZ commented on a change in pull request #2563: URL: https://github.com/apache/hive/pull/2563#discussion_r685006096 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); Review comment: Are you cleaning the whole cache when tez task doesn't have any deltas to report? What about data from other tasks that touched diff tables/partitions. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { Review comment: should we even create an instance of DeltaFilesMetricReporter when MetastoreConf.ConfVars.METRICS_ENABLED || METASTORE_ACIDMETRICS_EXT_ON are set to false? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); + } else { + // if there is no counter corresponding to a cache entry, remove from cache + ConcurrentMap<String, Integer> cacheMap = cache.asMap(); + cacheMap.keySet().stream().filter(key -> counters.findCounter(group.getName(), key).getValue() == 0) + .forEach(cache::invalidate); } - if (counter.getValue() > threshold) { - if (topN.size() == maxCacheSize) { - Pair<String, Integer> lowest = topN.peek(); - if (lowest != null && counter.getValue() > lowest.getValue()) { - cache.invalidate(lowest.getKey()); - } - } - if (topN.size() < maxCacheSize) { - topN.add(Pair.of(counter.getName(), (int) counter.getValue())); - cache.put(counter.getName(), (int) counter.getValue()); + // update existing cache entries or add new entries + for (TezCounter counter : group) { + Integer prev = cache.getIfPresent(counter.getName()); + if (prev != null && prev != counter.getValue()) { + cache.invalidate(counter.getName()); } + topN.add(Pair.of(counter.getName(), (int) counter.getValue())); Review comment: Would it work as topN cache? It's gonna evict entry when the limit is exceeded, even if that entry has a higher value than inserted (least-recently-used eviction) ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); + } else { + // if there is no counter corresponding to a cache entry, remove from cache + ConcurrentMap<String, Integer> cacheMap = cache.asMap(); + cacheMap.keySet().stream().filter(key -> counters.findCounter(group.getName(), key).getValue() == 0) + .forEach(cache::invalidate); } - if (counter.getValue() > threshold) { - if (topN.size() == maxCacheSize) { - Pair<String, Integer> lowest = topN.peek(); - if (lowest != null && counter.getValue() > lowest.getValue()) { - cache.invalidate(lowest.getKey()); - } - } - if (topN.size() < maxCacheSize) { - topN.add(Pair.of(counter.getName(), (int) counter.getValue())); - cache.put(counter.getName(), (int) counter.getValue()); + // update existing cache entries or add new entries + for (TezCounter counter : group) { + Integer prev = cache.getIfPresent(counter.getName()); + if (prev != null && prev != counter.getValue()) { + cache.invalidate(counter.getName()); } + topN.add(Pair.of(counter.getName(), (int) counter.getValue())); + cache.put(counter.getName(), (int) counter.getValue()); } - }); + } catch (Exception e) { + LOG.warn("Caught exception while trying to update delta metrics cache. Invalidating cache", e); + try { + cache.invalidateAll(); + } catch (Exception x) { + LOG.warn("Caught exception while trying to invalidate cache", x); + } + } } - public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdInSec, - float deltaPctThreshold, EnumMap<DeltaFilesMetricType, Map<String, Integer>> deltaFilesStats, - Configuration conf) throws IOException { + /** + * Update EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats with {@link AcidDirectory} + * contents + */ + public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdInSec, float deltaPctThreshold, + EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) throws IOException { + + int deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); Review comment: it's inefficient to access conf values for every table/partitions ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -219,8 +226,33 @@ public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdIn logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas); String path = getRelPath(dir); - newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas) - .forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new HashMap<>()).put(path, cnt)); + + filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, path, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats, + path, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, + path, maxCacheSize); + } + + /** + * Add partition and delta count to deltaFilesStats if the delta count is over the recording threshold and it is in + * the top {@link HiveConf.ConfVars#HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE} deltas. + */ + private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type, int deltaCount, int deltasThreshold, + EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, String path, int maxCacheSize) { + if (deltaCount > deltasThreshold) { + Queue<Pair<String,Integer>> pairQueue = deltaFilesStats.get(type); + if (pairQueue != null && pairQueue.size() == maxCacheSize) { + Pair<String, Integer> lowest = pairQueue.peek(); + if (lowest != null && deltaCount > lowest.getValue()) { + pairQueue.poll(); + } + } + if (pairQueue == null || pairQueue.size() < maxCacheSize) { + deltaFilesStats.computeIfAbsent(type, + v -> (new PriorityBlockingQueue<>(maxCacheSize, getComparator()))).add(Pair.of(path, deltaCount)); Review comment: Why do you need a blocking queue here? Would it be accessed by multiple threads? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); + } else { + // if there is no counter corresponding to a cache entry, remove from cache + ConcurrentMap<String, Integer> cacheMap = cache.asMap(); Review comment: This is supposed to be a global cache. I am not sure I understand it right. But looks like if some part is not present in tez counters it would be removed from global cache. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -267,32 +299,26 @@ private static String getRelPath(AcidUtils.Directory directory) { directory.getPath().getName(); } - private static EnumMap<DeltaFilesMetricType, Integer> newDeltaFilesStats(int numObsoleteDeltas, int numDeltas, int numSmallDeltas) { - return new EnumMap<DeltaFilesMetricType, Integer>(DeltaFilesMetricType.class) {{ - put(NUM_OBSOLETE_DELTAS, numObsoleteDeltas); - put(NUM_DELTAS, numDeltas); - put(NUM_SMALL_DELTAS, numSmallDeltas); - }}; - } - public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) { - if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && - MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { + try { + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf + .getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { - Arrays.stream(DeltaFilesMetricType.values()) - .filter(type -> jobConf.get(type.name()) != null) - .forEach(type -> - Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())).forEach( - (path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt)) - ) - ); + Arrays.stream(DeltaFilesMetricType.values()).filter(type -> jobConf.get(type.name()) != null).forEach( + type -> Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())) + .forEach((path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt)))); + } + } catch (Exception e) { Review comment: I don't see where a checked exception could be thrown here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 635811) Time Spent: 20m (was: 10m) > Delta metrics collection may cause number of tez counters to exceed > tez.counters.max limit > ------------------------------------------------------------------------------------------ > > Key: HIVE-25429 > URL: https://issues.apache.org/jira/browse/HIVE-25429 > Project: Hive > Issue Type: Sub-task > Components: Hive > Affects Versions: 4.0.0 > Reporter: Karen Coppage > Assignee: Karen Coppage > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > There's a limit to the number of tez counters allowed (tez.counters.max). > Delta metrics collection (i.e. DeltaFileMetricsReporter) was creating 3 > counters for each partition touched by a given query, which can result in a > huge number of counters, which is unnecessary because we're only interested > in n number of partitions with the most deltas. This change limits the number > of counters created to hive.txn.acid.metrics.max.cache.size*3. > Also when tez.counters.max is reached a LimitExceededException is thrown but > isn't caught on the Hive side and causes the query to fail. We should catch > this and skip delta metrics collection in this case. > Also make sure that metrics are only collected if > hive.metastore.acidmetrics.ext.on=true -- This message was sent by Atlassian Jira (v8.3.4#803005)