[ https://issues.apache.org/jira/browse/HIVE-25429?focusedWorklogId=636341&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-636341 ]
ASF GitHub Bot logged work on HIVE-25429: ----------------------------------------- Author: ASF GitHub Bot Created on: 10/Aug/21 09:03 Start Date: 10/Aug/21 09:03 Worklog Time Spent: 10m Work Description: klcopp commented on a change in pull request #2563: URL: https://github.com/apache/hive/pull/2563#discussion_r685133970 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { Review comment: technically not, but since it's a singleton the instance is created no matter what, right? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -219,8 +226,33 @@ public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdIn logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas); String path = getRelPath(dir); - newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas) - .forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new HashMap<>()).put(path, cnt)); + + filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, path, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats, + path, maxCacheSize); + filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, + path, maxCacheSize); + } + + /** + * Add partition and delta count to deltaFilesStats if the delta count is over the recording threshold and it is in + * the top {@link HiveConf.ConfVars#HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE} deltas. + */ + private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type, int deltaCount, int deltasThreshold, + EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, String path, int maxCacheSize) { + if (deltaCount > deltasThreshold) { + Queue<Pair<String,Integer>> pairQueue = deltaFilesStats.get(type); + if (pairQueue != null && pairQueue.size() == maxCacheSize) { + Pair<String, Integer> lowest = pairQueue.peek(); + if (lowest != null && deltaCount > lowest.getValue()) { + pairQueue.poll(); + } + } + if (pairQueue == null || pairQueue.size() < maxCacheSize) { + deltaFilesStats.computeIfAbsent(type, + v -> (new PriorityBlockingQueue<>(maxCacheSize, getComparator()))).add(Pair.of(path, deltaCount)); Review comment: No, I can use a PriorityQueue instead ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); + } else { + // if there is no counter corresponding to a cache entry, remove from cache + ConcurrentMap<String, Integer> cacheMap = cache.asMap(); Review comment: I see what you mean, but, I'm not sure how to update the cache then. For example, my_table had 200 deltas, then compaction happened and now it has 0. - before this change: there would be a counter with my_table=200, then after compaction a counter with my_table/my_partition=0 and we would invalidate my_table from the cache and not re-add. - after this change: there would be a counter with my_table=200, then after compaction there would be no counter afterwards (because 0 < min threshold, and because my_table probably won't be in the topN tables) ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -136,67 +133,77 @@ public static synchronized void init(HiveConf conf) throws Exception { private void configure(HiveConf conf) throws Exception { acidMetricsExtEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + if (acidMetricsExtEnabled) { - deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD); - obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD); - - initCachesForMetrics(conf); - initObjectsForMetrics(); + initCachesForMetrics(conf); + initObjectsForMetrics(); - long reportingInterval = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); + long reportingInterval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS); - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DeltaFilesMetricReporter %d") - .build(); - executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - executorService.scheduleAtFixedRate( - new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS); - LOG.info("Started DeltaFilesMetricReporter thread"); + LOG.info("Started DeltaFilesMetricReporter thread"); + } } public void submit(TezCounters counters) { if (acidMetricsExtEnabled) { - updateMetrics(NUM_OBSOLETE_DELTAS, - obsoleteDeltaCache, obsoleteDeltaTopN, obsoleteDeltasThreshold, - counters); - updateMetrics(NUM_DELTAS, - deltaCache, deltaTopN, deltasThreshold, - counters); - updateMetrics(NUM_SMALL_DELTAS, - smallDeltaCache, smallDeltaTopN, deltasThreshold, - counters); + updateMetrics(NUM_OBSOLETE_DELTAS, obsoleteDeltaCache, obsoleteDeltaTopN, counters); + updateMetrics(NUM_DELTAS, deltaCache, deltaTopN, counters); + updateMetrics(NUM_SMALL_DELTAS, smallDeltaCache, smallDeltaTopN, counters); } } + /** + * Copy counters to caches. + */ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, - int threshold, TezCounters counters) { - counters.getGroup(metric.value).forEach(counter -> { - Integer prev = cache.getIfPresent(counter.getName()); - if (prev != null && prev != counter.getValue()) { - cache.invalidate(counter.getName()); + TezCounters counters) { + try { + CounterGroup group = counters.getGroup(metric.value); + // if the group is empty, clear the cache + if (group.size() == 0) { + cache.invalidateAll(); + } else { + // if there is no counter corresponding to a cache entry, remove from cache + ConcurrentMap<String, Integer> cacheMap = cache.asMap(); Review comment: I guess the issue is: we don't know if a partition is missing from the counters because the query didn't touch it, or because the number of deltas has decreased ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java ########## @@ -267,32 +299,26 @@ private static String getRelPath(AcidUtils.Directory directory) { directory.getPath().getName(); } - private static EnumMap<DeltaFilesMetricType, Integer> newDeltaFilesStats(int numObsoleteDeltas, int numDeltas, int numSmallDeltas) { - return new EnumMap<DeltaFilesMetricType, Integer>(DeltaFilesMetricType.class) {{ - put(NUM_OBSOLETE_DELTAS, numObsoleteDeltas); - put(NUM_DELTAS, numDeltas); - put(NUM_SMALL_DELTAS, numSmallDeltas); - }}; - } - public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) { - if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && - MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { + try { + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf + .getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { - Arrays.stream(DeltaFilesMetricType.values()) - .filter(type -> jobConf.get(type.name()) != null) - .forEach(type -> - Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())).forEach( - (path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt)) - ) - ); + Arrays.stream(DeltaFilesMetricType.values()).filter(type -> jobConf.get(type.name()) != null).forEach( + type -> Splitter.on(',').withKeyValueSeparator("->").split(jobConf.get(type.name())) + .forEach((path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt)))); + } + } catch (Exception e) { Review comment: findCounter could throw a LimitExceededException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 636341) Time Spent: 0.5h (was: 20m) > Delta metrics collection may cause number of tez counters to exceed > tez.counters.max limit > ------------------------------------------------------------------------------------------ > > Key: HIVE-25429 > URL: https://issues.apache.org/jira/browse/HIVE-25429 > Project: Hive > Issue Type: Sub-task > Components: Hive > Affects Versions: 4.0.0 > Reporter: Karen Coppage > Assignee: Karen Coppage > Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > There's a limit to the number of tez counters allowed (tez.counters.max). > Delta metrics collection (i.e. DeltaFileMetricsReporter) was creating 3 > counters for each partition touched by a given query, which can result in a > huge number of counters, which is unnecessary because we're only interested > in n number of partitions with the most deltas. This change limits the number > of counters created to hive.txn.acid.metrics.max.cache.size*3. > Also when tez.counters.max is reached a LimitExceededException is thrown but > isn't caught on the Hive side and causes the query to fail. We should catch > this and skip delta metrics collection in this case. > Also make sure that metrics are only collected if > hive.metastore.acidmetrics.ext.on=true -- This message was sent by Atlassian Jira (v8.3.4#803005)