[ 
https://issues.apache.org/jira/browse/HIVE-26718?focusedWorklogId=837476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837476
 ]

ASF GitHub Bot logged work on HIVE-26718:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jan/23 13:21
            Start Date: 06/Jan/23 13:21
    Worklog Time Spent: 10m 
      Work Description: InvisibleProgrammer commented on code in PR #3775:
URL: https://github.com/apache/hive/pull/3775#discussion_r1063420879


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+        AcidUtils.isFullAcidTable(tblproperties)) {
+      long totalSize = baseSize + deltaSize;
+      long minimumSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+      if (totalSize > minimumSize) {
+        try {
+          Map<Integer, Long> bucketSizes = new HashMap<>();
+          //compute the size of each bucket
+          dir.getFiles().stream()
+              .filter(f -> 
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+              .forEach(
+                  f -> bucketSizes.merge(
+                      
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+                      Long::sum));
+          final double mean = (double) totalSize / bucketSizes.size();
+
+          // calculate the standard deviation
+          double standardDeviation = Math.sqrt(
+              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - 
mean, 2)) / bucketSizes.size()));
+
+          double rsdThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+          //Relative standard deviation: If the standard deviation is larger 
than rsdThreshold * average_bucket_size,
+          // a rebalancing compaction is initiated.
+          if (standardDeviation > mean * rsdThreshold) {
+            LOG.debug("");

Review Comment:
   Is that a leftover?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&

Review Comment:
   Major->Rebalance->Minor. As I see, after the change, that is the priority 
order of the different type of compactions. Is that the right order? Does the 
order matter at all? 



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+        AcidUtils.isFullAcidTable(tblproperties)) {
+      long totalSize = baseSize + deltaSize;
+      long minimumSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+      if (totalSize > minimumSize) {
+        try {
+          Map<Integer, Long> bucketSizes = new HashMap<>();
+          //compute the size of each bucket
+          dir.getFiles().stream()
+              .filter(f -> 
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+              .forEach(
+                  f -> bucketSizes.merge(
+                      
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+                      Long::sum));
+          final double mean = (double) totalSize / bucketSizes.size();
+
+          // calculate the standard deviation
+          double standardDeviation = Math.sqrt(
+              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - 
mean, 2)) / bucketSizes.size()));
+
+          double rsdThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+          //Relative standard deviation: If the standard deviation is larger 
than rsdThreshold * average_bucket_size,
+          // a rebalancing compaction is initiated.
+          if (standardDeviation > mean * rsdThreshold) {
+            LOG.debug("");

Review Comment:
   Should it contain a log information that a rebalancing compaction is 
initiated?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&

Review Comment:
   Could you please extract it into a method to improve readability?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 837476)
    Time Spent: 1h 10m  (was: 1h)

> Enable initiator to schedule rebalancing compactions
> ----------------------------------------------------
>
>                 Key: HIVE-26718
>                 URL: https://issues.apache.org/jira/browse/HIVE-26718
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Hive
>            Reporter: László Végh
>            Assignee: László Végh
>            Priority: Major
>              Labels: ACID, compaction, pull-request-available
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Initiator should be able to schedule rebalancing compactions based on a set 
> of predefined and configurable thresholds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to