[ https://issues.apache.org/jira/browse/HIVE-26718?focusedWorklogId=837476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837476 ]
ASF GitHub Bot logged work on HIVE-26718: ----------------------------------------- Author: ASF GitHub Bot Created on: 06/Jan/23 13:21 Start Date: 06/Jan/23 13:21 Worklog Time Spent: 10m Work Description: InvisibleProgrammer commented on code in PR #3775: URL: https://github.com/apache/hive/pull/3775#discussion_r1063420879 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java: ########## @@ -504,6 +504,47 @@ private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory if (initiateMajor) return CompactionType.MAJOR; } + // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance + // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of + // files shouldn't be too high + if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) && + HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) && + AcidUtils.isFullAcidTable(tblproperties)) { + long totalSize = baseSize + deltaSize; + long minimumSize = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE); + if (totalSize > minimumSize) { + try { + Map<Integer, Long> bucketSizes = new HashMap<>(); + //compute the size of each bucket + dir.getFiles().stream() + .filter(f -> AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath())) + .forEach( + f -> bucketSizes.merge( + AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()), + f.getHdfsFileStatusWithId().getFileStatus().getLen(), + Long::sum)); + final double mean = (double) totalSize / bucketSizes.size(); + + // calculate the standard deviation + double standardDeviation = Math.sqrt( + bucketSizes.values().stream().mapToDouble(Long::doubleValue) + .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - mean, 2)) / bucketSizes.size())); + + double rsdThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD); + //Relative standard deviation: If the standard deviation is larger than rsdThreshold * average_bucket_size, + // a rebalancing compaction is initiated. + if (standardDeviation > mean * rsdThreshold) { + LOG.debug(""); Review Comment: Is that a leftover? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java: ########## @@ -504,6 +504,47 @@ private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory if (initiateMajor) return CompactionType.MAJOR; } + // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance + // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of + // files shouldn't be too high + if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) && Review Comment: Major->Rebalance->Minor. As I see, after the change, that is the priority order of the different type of compactions. Is that the right order? Does the order matter at all? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java: ########## @@ -504,6 +504,47 @@ private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory if (initiateMajor) return CompactionType.MAJOR; } + // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance + // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of + // files shouldn't be too high + if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) && + HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) && + AcidUtils.isFullAcidTable(tblproperties)) { + long totalSize = baseSize + deltaSize; + long minimumSize = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE); + if (totalSize > minimumSize) { + try { + Map<Integer, Long> bucketSizes = new HashMap<>(); + //compute the size of each bucket + dir.getFiles().stream() + .filter(f -> AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath())) + .forEach( + f -> bucketSizes.merge( + AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()), + f.getHdfsFileStatusWithId().getFileStatus().getLen(), + Long::sum)); + final double mean = (double) totalSize / bucketSizes.size(); + + // calculate the standard deviation + double standardDeviation = Math.sqrt( + bucketSizes.values().stream().mapToDouble(Long::doubleValue) + .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - mean, 2)) / bucketSizes.size())); + + double rsdThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD); + //Relative standard deviation: If the standard deviation is larger than rsdThreshold * average_bucket_size, + // a rebalancing compaction is initiated. + if (standardDeviation > mean * rsdThreshold) { + LOG.debug(""); Review Comment: Should it contain a log information that a rebalancing compaction is initiated? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java: ########## @@ -504,6 +504,47 @@ private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory if (initiateMajor) return CompactionType.MAJOR; } + // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance + // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of + // files shouldn't be too high + if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) && Review Comment: Could you please extract it into a method to improve readability? Issue Time Tracking ------------------- Worklog Id: (was: 837476) Time Spent: 1h 10m (was: 1h) > Enable initiator to schedule rebalancing compactions > ---------------------------------------------------- > > Key: HIVE-26718 > URL: https://issues.apache.org/jira/browse/HIVE-26718 > Project: Hive > Issue Type: Sub-task > Components: Hive > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: ACID, compaction, pull-request-available > Time Spent: 1h 10m > Remaining Estimate: 0h > > Initiator should be able to schedule rebalancing compactions based on a set > of predefined and configurable thresholds. -- This message was sent by Atlassian Jira (v8.20.10#820010)