[ 
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=844811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-844811
 ]

ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Feb/23 13:16
            Start Date: 10/Feb/23 13:16
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1102678735


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/Handler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * An abstract class which defines the list of utility methods for performing 
cleanup activities.
+ */
+public abstract class Handler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Handler.class.getName());
+  protected final TxnStore txnHandler;
+  protected final HiveConf conf;
+  protected final boolean metricsEnabled;
+  private Optional<Cache<String, TBase>> metaCache;
+
+  Handler(HiveConf conf, TxnStore txnHandler, boolean metricsEnabled) {
+    this.conf = conf;
+    this.txnHandler = txnHandler;
+    boolean tableCacheOn = MetastoreConf.getBoolVar(this.conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
+    this.metaCache = initializeCache(tableCacheOn);
+    this.metricsEnabled = metricsEnabled;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public TxnStore getTxnHandler() {
+    return txnHandler;
+  }
+
+  public boolean isMetricsEnabled() {
+    return metricsEnabled;
+  }
+
+  /**
+   * Find the list of objects which are ready for cleaning.
+   * @return Cleaning requests
+   */
+  public abstract List<CleaningRequest> findReadyToClean() throws 
MetaException;
+
+  /**
+   * Execute just before cleanup
+   * @param cleaningRequest - Cleaning request
+   */
+  public abstract void beforeExecutingCleaningRequest(CleaningRequest 
cleaningRequest) throws MetaException;
+
+  /**
+   * Execute just after cleanup
+   * @param cleaningRequest Cleaning request
+   * @param deletedFiles List of deleted files
+   * @throws MetaException
+   */
+  public abstract void afterExecutingCleaningRequest(CleaningRequest 
cleaningRequest, List<Path> deletedFiles) throws MetaException;
+
+  /**
+   * Execute in the event of failure
+   * @param cleaningRequest Cleaning request
+   * @param ex Failure exception
+   * @throws MetaException
+   */
+  public abstract void failureExecutingCleaningRequest(CleaningRequest 
cleaningRequest, Exception ex) throws MetaException;

Review Comment:
   Insteadof this fixed method, you could implement an observer pattern: 
provide a method for registering exception handlers which will be executed in 
case of an exception. 
   
   That would give better flexibility: 
   1) The error handler can be completely independent of the Cleaning request 
handler (for example common handler for all handler implementations).
   2) It would be possible to register multiple errorhandlers, for example 
Cleaner could register its own as well.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionCleaningRequest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A cleaning request class specific to compaction based cleanup.
+ */
+public class CompactionCleaningRequest extends CleaningRequest {
+
+  private final CompactionInfo compactionInfo;
+  private final Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots;
+
+  public CompactionCleaningRequest(String location, CompactionInfo info, 
List<Path> obsoleteDirs,
+                                   boolean purge, FileSystem fs, Map<Path, 
AcidUtils.HdfsDirSnapshot> dirSnapshots,
+                                   boolean dropPartition) {
+    super(RequestType.COMPACTION, location, obsoleteDirs, purge, fs);
+    this.compactionInfo = info;
+    this.dbName = compactionInfo.dbname;
+    this.tableName = compactionInfo.tableName;
+    this.partitionName = compactionInfo.partName;
+    this.dirSnapshots = dirSnapshots;
+    this.cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+            (compactionInfo.type != null ? 
compactionInfo.type.toString().toLowerCase() : null);
+    this.runAs = compactionInfo.runAs;
+    this.dropPartition = dropPartition;
+    this.fullPartitionName = compactionInfo.getFullPartitionName();
+  }
+
+  public CompactionInfo getCompactionInfo() {
+    return compactionInfo;
+  }
+
+  public Map<Path, AcidUtils.HdfsDirSnapshot> getHdfsDirSnapshots() {
+    return dirSnapshots;
+  }
+
+  @Override
+  public String toString() {

Review Comment:
   You could use org.apache.commons.lang3.builder.ToStringBuilder.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -212,323 +140,8 @@ public void run() {
     }
   }
 
-  private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean 
metricsEnabled) throws MetaException {
-    LOG.info("Starting cleaning for " + ci);
-    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
-    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
-        (ci.type != null ? ci.type.toString().toLowerCase() : null);
-    try {
-      if (metricsEnabled) {
-        perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
-      }
-      final String location = ci.getProperty("location");
-
-      Callable<Boolean> cleanUpTask;
-      Table t = null;
-      Partition p = null;
-
-      if (location == null) {
-        t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
-        if (t == null) {
-          // The table was dropped before we got around to cleaning it.
-          LOG.info("Unable to find table " + ci.getFullTableName() + ", 
assuming it was dropped." +
-            idWatermark(ci));
-          txnHandler.markCleaned(ci);
-          return;
-        }
-        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
-          // The table was marked no clean up true.
-          LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as 
NO_CLEANUP set to true");
-          txnHandler.markCleaned(ci);
-          return;
-        }
-        if (ci.partName != null) {
-          p = resolvePartition(ci);
-          if (p == null) {
-            // The partition was dropped before we got around to cleaning it.
-            LOG.info("Unable to find partition " + ci.getFullPartitionName() +
-              ", assuming it was dropped." + idWatermark(ci));
-            txnHandler.markCleaned(ci);
-            return;
-          }
-          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
-            // The partition was marked no clean up true.
-            LOG.info("Skipping partition " + ci.getFullPartitionName() + " 
clean up, as NO_CLEANUP set to true");
-            txnHandler.markCleaned(ci);
-            return;
-          }
-        }
-      }
-      txnHandler.markCleanerStart(ci);
-
-      if (t != null || ci.partName != null) {
-        String path = location == null
-            ? resolveStorageDescriptor(t, p).getLocation()
-            : location;
-        boolean dropPartition = ci.partName != null && p == null;
-        cleanUpTask = () -> removeFiles(path, minOpenTxnGLB, ci, 
dropPartition);
-      } else {
-        cleanUpTask = () -> removeFiles(location, ci);
-      }
-
-      Ref<Boolean> removedFiles = Ref.from(false);
-      if (runJobAsSelf(ci.runAs)) {
-        removedFiles.value = cleanUpTask.call();
-      } else {
-        LOG.info("Cleaning as user " + ci.runAs + " for " + 
ci.getFullPartitionName());
-        UserGroupInformation ugi = 
UserGroupInformation.createProxyUser(ci.runAs,
-            UserGroupInformation.getLoginUser());
-        try {
-          ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-            removedFiles.value = cleanUpTask.call();
-            return null;
-          });
-        } finally {
-          try {
-            FileSystem.closeAllForUGI(ugi);
-          } catch (IOException exception) {
-            LOG.error("Could not clean up file-system handles for UGI: " + ugi 
+ " for " +
-                ci.getFullPartitionName() + idWatermark(ci), exception);
-          }
-        }
-      }
-      if (removedFiles.value || isDynPartAbort(t, ci)) {
-        txnHandler.markCleaned(ci);
-      } else {
-        txnHandler.clearCleanerStart(ci);
-        LOG.warn("No files were removed. Leaving queue entry " + ci + " in 
ready for cleaning state.");
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception when cleaning, unable to complete cleaning 
of " + ci + " " +
-          StringUtils.stringifyException(e));
-      ci.errorMessage = e.getMessage();
-      if (metricsEnabled) {
-        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
-      }
-      handleCleanerAttemptFailure(ci);
-    }  finally {
-      if (metricsEnabled) {
-        perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric);
-      }
-    }
-  }
-
-  private void handleCleanerAttemptFailure(CompactionInfo ci) throws 
MetaException {
-    long defaultRetention = getTimeVar(conf, 
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
-    int cleanAttempts = 0;
-    if (ci.retryRetention > 0) {
-      cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / 
Math.log(2)) + 1;
-    }
-    if (cleanAttempts >= getIntVar(conf, 
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
-      //Mark it as failed if the max attempt threshold is reached.
-      txnHandler.markFailed(ci);
-    } else {
-      //Calculate retry retention time and update record.
-      ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
-      txnHandler.setCleanerRetryRetentionTimeOnError(ci);
-    }
-  }
-
-  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, 
ValidTxnList validTxnList)
-      throws NoSuchTxnException, MetaException {
-    List<String> tblNames = 
Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
-    GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
-    request.setValidTxnList(validTxnList.writeToString());
-    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
-    // we could have no write IDs for a table if it was never written to but
-    // since we are in the Cleaner phase of compactions, there must have
-    // been some delta/base dirs
-    assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
-    ValidReaderWriteIdList validWriteIdList =
-        
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
-    /*
-     * We need to filter the obsoletes dir list, to only remove directories 
that were made obsolete by this compaction
-     * If we have a higher retentionTime it is possible for a second 
compaction to run on the same partition. Cleaning up the first compaction
-     * should not touch the newer obsolete directories to not to violate the 
retentionTime for those.
-     */
-    if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
-      validWriteIdList = 
validWriteIdList.updateHighWatermark(ci.highestWriteId);
-    }
-    return validWriteIdList;
-  }
-
-  private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
-    return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> 
pk.size() > 0).isPresent()
-        && ci.partName == null;
-  }
-
-  private static String idWatermark(CompactionInfo ci) {
-    return " id=" + ci.id;
-  }
-
-  private boolean removeFiles(String location, long minOpenTxnGLB, 
CompactionInfo ci, boolean dropPartition)
-      throws Exception {
-
-    if (dropPartition) {
-      LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
-      LockResponse res = null;
-
-      try {
-        res = txnHandler.lock(lockRequest);
-        if (res.getState() == LockState.ACQUIRED) {
-          //check if partition wasn't re-created
-          if (resolvePartition(ci) == null) {
-            return removeFiles(location, ci);
-          }
-        }
-      } catch (NoSuchTxnException | TxnAbortedException e) {
-        LOG.error(e.getMessage());
-      } finally {
-        if (res != null) {
-          try {
-            txnHandler.unlock(new UnlockRequest(res.getLockid()));
-          } catch (NoSuchLockException | TxnOpenException e) {
-            LOG.error(e.getMessage());
-          }
-        }
-      }
-    }
-
-    ValidTxnList validTxnList =
-      TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenTxnGLB);
-    //save it so that getAcidState() sees it
-    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-    /**
-     * {@code validTxnList} is capped by minOpenTxnGLB so if
-     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} 
sees a base/delta
-     * produced by a compactor, that means every reader that could be active 
right now see it
-     * as well.  That means if this base/delta shadows some earlier 
base/delta, the it will be
-     * used in favor of any files that it shadows.  Thus the shadowed files 
are safe to delete.
-     *
-     *
-     * The metadata about aborted writeIds (and consequently aborted txn IDs) 
cannot be deleted
-     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
-     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
-     * For example given partition P1, txnid:150 starts and sees txnid:149 as 
open.
-     * Say compactor runs in txnid:160, but 149 is still open and P1 has the 
largest resolved
-     * writeId:17.  Compactor will produce base_17_c160.
-     * Suppose txnid:149 writes delta_18_18
-     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
-     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and 
perhaps corrupted) but
-     * not visible based on 'validTxnList' capped at minOpenTxn so it will not 
not be cleaned by
-     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so 
we must keep the
-     * metadata that says that 18 is aborted.
-     * In a slightly different case, whatever txn created delta_18 (and all 
other txn) may have
-     * committed by the time cleaner runs and so cleaner will indeed see 
delta_18_18 and remove
-     * it (since it has nothing but aborted data).  But we can't tell which 
actually happened
-     * in markCleaned() so make sure it doesn't delete meta above 
CG_CQ_HIGHEST_WRITE_ID.
-     *
-     * We could perhaps make cleaning of aborted and obsolete and remove all 
aborted files up
-     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta 
can be removed
-     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  
This could be
-     * useful if there is all of a sudden a flood of aborted txns.  (For 
another day).
-     */
-
-    // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
-    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, 
validTxnList);
-    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
-    return removeFiles(location, validWriteIdList, ci);
-  }
-  /**
-   * @return true if any files were removed
-   */
-  private boolean removeFiles(String location, ValidWriteIdList writeIdList, 
CompactionInfo ci)
-      throws Exception {
-    Path path = new Path(location);
-    FileSystem fs = path.getFileSystem(conf);
-    
-    // Collect all of the files/dirs
-    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
-    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, 
Ref.from(false), false, 
-        dirSnapshots);
-    Table table = computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci));
-    boolean isDynPartAbort = isDynPartAbort(table, ci);
-    
-    List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
-    if (isDynPartAbort || dir.hasUncompactedAborts()) {
-      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
-    }
-    List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
-    if (dir.getObsolete().size() > 0) {
-      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, 
ci.partName, dir.getObsolete(), conf,
-          txnHandler);
-    }
-    // Make sure there are no leftovers below the compacted watermark
-    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
-    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
-        ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, 
Long.MAX_VALUE),
-      Ref.from(false), false, dirSnapshots);
-    
-    List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), 
deleted);
-    if (!remained.isEmpty()) {
-      LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
-        " obsolete directories from " + location + ". " + 
getDebugInfo(remained));
-      return false;
-    }
-    LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + 
ci.highestWriteId + " from " + location);
-    return true;
-  }
-  
-  private List<Path> getObsoleteDirs(AcidDirectory dir, boolean 
isDynPartAbort) {
-    List<Path> obsoleteDirs = dir.getObsolete();
-    /**
-     * add anything in 'dir'  that only has data from aborted transactions - 
no one should be
-     * trying to read anything in that dir (except getAcidState() that only 
reads the name of
-     * this dir itself)
-     * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's 
ok (suppose there
-     * are no active txns when cleaner runs).  The key is to not delete 
metadata about aborted
-     * txns with write IDs > {@link CompactionInfo#highestWriteId}.
-     * See {@link TxnStore#markCleaned(CompactionInfo)}
-     */
-    obsoleteDirs.addAll(dir.getAbortedDirectories());
-    if (isDynPartAbort) {
-      // In the event of an aborted DP operation, we should only consider the 
aborted directories for cleanup.
-      // Including obsolete directories for partitioned tables can result in 
data loss.
-      obsoleteDirs = dir.getAbortedDirectories();
-    }
-    return obsoleteDirs;
-  }
-
-  private boolean removeFiles(String location, CompactionInfo ci) throws 
IOException, MetaException {
-    String strIfPurge = ci.getProperty("ifPurge");
-    boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
-    
-    Path path = new Path(location);
-    return !remove(location, ci, Collections.singletonList(path), ifPurge,
-      path.getFileSystem(conf)).isEmpty();
-  }
-
-  private List<Path> remove(String location, CompactionInfo ci, List<Path> 
paths, boolean ifPurge, FileSystem fs)
-      throws MetaException, IOException {
-    List<Path> deleted = new ArrayList<>();
-    if (paths.size() < 1) {
-      return deleted;
-    }
-    LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
-      " obsolete directories from " + location + ". " + getDebugInfo(paths));
-    boolean needCmRecycle;
-    try {
-      Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), 
ci.dbname);
-      needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-    } catch (NoSuchObjectException ex) {
-      // can not drop a database which is a source of replication
-      needCmRecycle = false;
-    }
-    for (Path dead : paths) {
-      LOG.debug("Going to delete path " + dead.toString());
-      if (needCmRecycle) {
-        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, 
ifPurge);
-      }
-      if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
-        deleted.add(dead);
-      }
-    }
-    return deleted;
-  }
-  
-  private String getDebugInfo(List<Path> paths) {
-    return "[" + 
paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+  public void setHandlers(List<Handler> handlers) {

Review Comment:
   Instead of introducing this method for tests, you should have two 
constuctors and a HandlerFactory instance field like in Worker for 
CompactorFactory 



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/Handler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * An abstract class which defines the list of utility methods for performing 
cleanup activities.
+ */
+public abstract class Handler {

Review Comment:
   What about introducing a generic type here like Handler\<T extends 
CleaningRequest\> and using T in the methods like public abstract List\<T\> 
findReadyToClean() throws MetaException;?
   
   As a result CleaningRequest could contain only the fields necessary for the 
common part in Handler and FSRemover. Everything else could be moved completely 
to the descendant classes (if not moved yet) and the Handler implementations 
could be paired with them like: CompactionHandler extends 
Handler\<CleaningRequest\>. This would also eliminate the need for casting in 
CompactionHandler. 



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -141,45 +85,29 @@ public void run() {
                     new 
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, 
startedAt));
           }
 
-          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-
-          checkInterrupt();
-
-          List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+          for (Handler handler : handlers) {
+            List<CleaningRequest> readyToClean = handler.findReadyToClean();

Review Comment:
   If the frst handler is broken for whatever reason, the remaining handlers 
won't get any chance to provide cleaning requests. This method call should be 
handled in a way to continue the handler iteration so subsquent ones can stil 
provide CleaningRequests



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HandlerFactory.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.CompactionHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.Handler;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A factory class to fetch handlers.
+ */
+public class HandlerFactory {

Review Comment:
   This could be moved inside handler package, so the concrete Handler 
implementations could be package private in order to prevent client code to 
obtain them anyhow else than through the factory



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleaningRequest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * A class which specifies the required information for cleanup.
+ * Objects from this class are passed to FSRemover for cleanup.
+ */
+public class CleaningRequest {
+  public enum RequestType {
+    COMPACTION,
+  }
+  private final RequestType type;
+  private final String location;
+  private final List<Path> obsoleteDirs;
+  private final boolean purge;
+  private final FileSystem fs;
+  protected String runAs;

Review Comment:
   If you want to restrict access for these fields, these should be either 
private or the class should be moved into another package. With the protected 
access right now they are wisible everywhere in the 
org.apache.hadoop.hive.ql.txn.compactor package which I guess not the desired 
visibility level.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactionCleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+
+/**
+ * A compaction based implementation of Handler.
+ * Provides implementation of finding ready to clean items, preprocessing of 
cleaning request,
+ * postprocessing of cleaning request and failure handling of cleaning request.
+ */
+public class CompactionHandler extends Handler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionHandler.class.getName());
+
+  public CompactionHandler(HiveConf conf, TxnStore txnHandler, boolean 
metricsEnabled) {
+    super(conf, txnHandler, metricsEnabled);
+  }
+
+  @Override
+  public List<CleaningRequest> findReadyToClean() throws MetaException {
+    List<CleaningRequest> cleaningRequests = new ArrayList<>();
+    long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
+            : 0;
+    long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+    List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+
+    if (!readyToClean.isEmpty()) {
+      long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+      final long cleanerWaterMark =
+              minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
+
+      LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+      // For checking which compaction can be cleaned we can use the 
minOpenTxnId
+      // However findReadyToClean will return all records that were compacted 
with old version of HMS
+      // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to 
provide minTxnIdSeenOpen
+      // to the clean method, to avoid cleaning up deltas needed for running 
queries
+      // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
+      // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
+      for (CompactionInfo ci : readyToClean) {
+        LOG.info("Starting cleaning for {}", ci);
+        try {
+          final String location = ci.getProperty("location");
+
+          Table t = null;
+          Partition p = null;
+
+          if (location == null) {
+            t = computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+            if (t == null) {
+              // The table was dropped before we got around to cleaning it.
+              LOG.info("Unable to find table {}, assuming it was dropped. {}", 
ci.getFullTableName(),
+                      idWatermark(ci));
+              txnHandler.markCleaned(ci);
+              continue;
+            }
+            if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+              // The table was marked no clean up true.
+              LOG.info("Skipping table {} clean up, as NO_CLEANUP set to 
true", ci.getFullTableName());
+              txnHandler.markCleaned(ci);
+              continue;
+            }
+            if (ci.partName != null) {
+              p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+              if (p == null) {
+                // The partition was dropped before we got around to cleaning 
it.
+                LOG.info("Unable to find partition {}, assuming it was 
dropped. {}", ci.getFullTableName(), idWatermark(ci));
+                txnHandler.markCleaned(ci);
+                continue;
+              }
+              if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+                // The partition was marked no clean up true.
+                LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to 
true", ci.getFullPartitionName());
+                txnHandler.markCleaned(ci);
+                continue;
+              }
+            }
+          }
+
+          if (t != null || ci.partName != null) {
+            String path = location == null
+                    ? CompactorUtil.resolveStorageDescriptor(t, 
p).getLocation()
+                    : location;
+            boolean dropPartition = ci.partName != null && p == null;
+            if (dropPartition) {
+              //check if partition wasn't re-created
+              if (resolvePartition(ci.dbname, ci.tableName, ci.partName) == 
null) {
+                String strIfPurge = ci.getProperty("ifPurge");
+                boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+                Path obsoletePath = new Path(path);
+                cleaningRequests.add(new CompactionCleaningRequest(path, ci, 
Collections.singletonList(obsoletePath), ifPurge,
+                        obsoletePath.getFileSystem(conf), null, true));
+                continue;
+              }
+            }
+
+            ValidTxnList validTxnList =
+                    
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
cleanerWaterMark);
+            //save it so that getAcidState() sees it
+            conf.set(ValidTxnList.VALID_TXNS_KEY, 
validTxnList.writeToString());
+            /*
+             * {@code validTxnList} is capped by minOpenTxnGLB so if
+             * {@link AcidUtils#getAcidState(Path, Configuration, 
ValidWriteIdList)} sees a base/delta
+             * produced by a compactor, that means every reader that could be 
active right now see it
+             * as well.  That means if this base/delta shadows some earlier 
base/delta, the it will be
+             * used in favor of any files that it shadows.  Thus the shadowed 
files are safe to delete.
+             *
+             *
+             * The metadata about aborted writeIds (and consequently aborted 
txn IDs) cannot be deleted
+             * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+             * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+             * For example given partition P1, txnid:150 starts and sees 
txnid:149 as open.
+             * Say compactor runs in txnid:160, but 149 is still open and P1 
has the largest resolved
+             * writeId:17.  Compactor will produce base_17_c160.
+             * Suppose txnid:149 writes delta_18_18
+             * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS 
entries
+             * up to (inclusive) writeId:17 since delta_18_18 may be on disk 
(and perhaps corrupted) but
+             * not visible based on 'validTxnList' capped at minOpenTxn so it 
will not not be cleaned by
+             * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} 
and so we must keep the
+             * metadata that says that 18 is aborted.
+             * In a slightly different case, whatever txn created delta_18 
(and all other txn) may have
+             * committed by the time cleaner runs and so cleaner will indeed 
see delta_18_18 and remove
+             * it (since it has nothing but aborted data).  But we can't tell 
which actually happened
+             * in markCleaned() so make sure it doesn't delete meta above 
CG_CQ_HIGHEST_WRITE_ID.
+             *
+             * We could perhaps make cleaning of aborted and obsolete and 
remove all aborted files up
+             * to the current Min Open Write Id, this way aborted 
TXN_COMPONENTS meta can be removed
+             * as well up to that point which may be higher than 
CQ_HIGHEST_WRITE_ID.  This could be
+             * useful if there is all of a sudden a flood of aborted txns.  
(For another day).
+             */
+
+            // Creating 'reader' list since we are interested in the set of 
'obsolete' files
+            ValidReaderWriteIdList validWriteIdList = 
getValidCleanerWriteIdList(ci, validTxnList);
+            LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+            Path loc = new Path(path);
+            FileSystem fs = loc.getFileSystem(conf);
+
+            // Collect all the files/dirs
+            Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, loc);
+            AcidDirectory dir = AcidUtils.getAcidState(fs, loc, conf, 
validWriteIdList, Ref.from(false), false,
+                    dirSnapshots);
+            Table table = computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+            boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, 
ci.partName);
+
+            List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
+            if (isDynPartAbort || dir.hasUncompactedAborts()) {
+              ci.setWriteIds(dir.hasUncompactedAborts(), 
dir.getAbortedWriteIds());
+            }
+
+            cleaningRequests.add(new CompactionCleaningRequest(path, ci, 
obsoleteDirs, true, fs, dirSnapshots, false));
+          } else {
+            String strIfPurge = ci.getProperty("ifPurge");
+            boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+            Path obsoletePath = new Path(location);
+            cleaningRequests.add(new CompactionCleaningRequest(location, ci, 
Collections.singletonList(obsoletePath), ifPurge,
+                    obsoletePath.getFileSystem(conf), null, false));
+          }
+        } catch (Exception e) {
+          LOG.warn("Cleaning request was not successful generated for : {} due 
to {}", idWatermark(ci), e.getMessage());
+          ci.errorMessage = e.getMessage();
+          if (metricsEnabled) {
+            
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+          }
+          handleCleanerAttemptFailure(ci);
+        }
+      }
+    }
+    return cleaningRequests;
+  }
+
+  @Override
+  public void beforeExecutingCleaningRequest(CleaningRequest cleaningRequest) 
throws MetaException {
+    CompactionInfo ci = ((CompactionCleaningRequest) 
cleaningRequest).getCompactionInfo();
+    txnHandler.markCleanerStart(ci);
+  }
+
+  @Override
+  public void afterExecutingCleaningRequest(CleaningRequest cleaningRequest, 
List<Path> deletedFiles) throws MetaException {
+    CompactionInfo ci = ((CompactionCleaningRequest) 
cleaningRequest).getCompactionInfo();
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
((CompactionCleaningRequest) cleaningRequest).getHdfsDirSnapshots();
+    // Make sure there are no leftovers below the compacted watermark
+    if (dirSnapshots != null) {
+      conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+      Path path = new Path(cleaningRequest.getLocation());
+      Table table;
+      boolean success = false;

Review Comment:
   Is it possible that FSRemover shares some information about the result of 
the file removal? Ideally the FileSystem related tasks should be extracted from 
the handlers





Issue Time Tracking
-------------------

    Worklog Id:     (was: 844811)
    Time Spent: 1h 10m  (was: 1h)

> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
>                 Key: HIVE-27019
>                 URL: https://issues.apache.org/jira/browse/HIVE-27019
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Sourabh Badhya
>            Assignee: Sourabh Badhya
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> As described by the parent task - 
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from 
> relevant tables and converts it into a request entity called CleaningRequest. 
> It would also do SQL operations post cleanup (postprocess). Every type of 
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from 
> various handlers and deletes them according to the cleaning request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to