[ 
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=847501&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847501
 ]

ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Feb/23 11:27
            Start Date: 24/Feb/23 11:27
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1116855904


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CacheContainer;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import 
org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest.CleaningRequestBuilder;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static java.util.Objects.isNull;
+
+/**
+ * A compaction based implementation of RequestHandler.
+ * Provides implementation of finding ready to clean items, preprocessing of 
cleaning request,
+ * postprocessing of cleaning request and failure handling of cleaning request.
+ */
+class CompactionHandler extends RequestHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionHandler.class.getName());
+
+  public CompactionHandler(HiveConf conf, TxnStore txnHandler,
+                                          CacheContainer cacheContainer, 
boolean metricsEnabled,
+                                          FSRemover fsRemover, ExecutorService 
cleanerExecutor) {
+    super(conf, txnHandler, cacheContainer, metricsEnabled, fsRemover, 
cleanerExecutor);
+  }
+
+  @Override
+  protected void processInternal() throws Exception {
+    long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+    long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
+            : 0;
+    List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+    if (!readyToClean.isEmpty()) {
+      long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+      final long cleanerWaterMark =
+              minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
+
+      LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+      // For checking which compaction can be cleaned we can use the 
minOpenTxnId
+      // However findReadyToClean will return all records that were compacted 
with old version of HMS
+      // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to 
provide minTxnIdSeenOpen
+      // to the clean method, to avoid cleaning up deltas needed for running 
queries
+      // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
+      // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
+      List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
+      for (CompactionInfo ci : readyToClean) {
+        CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
+                CompactorUtil.ThrowingRunnable.unchecked(() -> clean(ci, 
cleanerWaterMark, metricsEnabled)), cleanerExecutor)
+                .exceptionally(t -> {
+                  LOG.error("Error clearing {} due to :", 
ci.getFullPartitionName(), t);
+                  return null;
+                });
+        asyncTasks.add(asyncTask);
+      }
+      //Use get instead of join, so we can receive InterruptedException and 
shutdown gracefully
+      CompletableFuture.allOf(asyncTasks.toArray(new 
CompletableFuture[0])).get();
+    }
+  }
+
+  private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean 
metricsEnabled) throws MetaException {
+    LOG.info("Starting cleaning for {}", ci);
+    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+            (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+    try {
+      if (metricsEnabled) {
+        perfLogger.perfLogBegin(CompactionHandler.class.getName(), 
cleanerMetric);
+      }
+      final String location = ci.getProperty("location");
+
+      Table t = null;
+      Partition p = null;
+
+      if (isNull(location)) {
+        t = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+        if (isNull(t)) {
+          // The table was dropped before we got around to cleaning it.
+          LOG.info("Unable to find table {}, assuming it was dropped. {}", 
ci.getFullTableName(),
+                  idWatermark(ci));
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+          // The table was marked no clean up true.
+          LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", 
ci.getFullTableName());
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (!isNull(ci.partName)) {
+          p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+          if (isNull(p)) {
+            // The partition was dropped before we got around to cleaning it.
+            LOG.info("Unable to find partition {}, assuming it was dropped. 
{}",
+                    ci.getFullPartitionName(), idWatermark(ci));
+            txnHandler.markCleaned(ci);
+            return;
+          }
+          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+            // The partition was marked no clean up true.
+            LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to 
true", ci.getFullPartitionName());
+            txnHandler.markCleaned(ci);
+            return;
+          }
+        }
+      }
+      txnHandler.markCleanerStart(ci);
+
+      if (!isNull(t) || !isNull(ci.partName)) {
+        String path = isNull(location)
+                ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+                : location;
+        boolean dropPartition = !isNull(ci.partName) && isNull(p);
+
+        //check if partition wasn't re-created
+        if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, 
ci.partName))) {
+          cleanUsingLocation(ci, path, true);
+        } else {
+          cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+        }
+      } else {
+        cleanUsingLocation(ci, location, false);
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning 
of {} due to {}", ci,
+              e.getMessage());
+      ci.errorMessage = e.getMessage();
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+      }
+      handleCleanerAttemptFailure(ci);
+    }  finally {
+      if (metricsEnabled) {
+        perfLogger.perfLogEnd(CompactionHandler.class.getName(), 
cleanerMetric);
+      }
+    }
+  }
+
+  private void cleanUsingLocation(CompactionInfo ci, String path, boolean 
requiresLock) throws MetaException {
+    List<Path> deleted;
+    if (requiresLock) {
+      LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
+      LockResponse res = null;
+      try {
+        res = txnHandler.lock(lockRequest);
+        deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+      } catch (NoSuchTxnException | TxnAbortedException e) {
+        LOG.error("Error while trying to acquire exclusive write lock: {}", 
e.getMessage());
+        throw new MetaException(e.getMessage());
+      } finally {
+        if (res != null) {
+          try {
+            txnHandler.unlock(new UnlockRequest(res.getLockid()));
+          } catch (NoSuchLockException | TxnOpenException e) {
+            LOG.error("Error while trying to release exclusive write lock: 
{}", e.getMessage());
+          }
+        }
+      }
+    } else {
+      deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+    }
+    if (!deleted.isEmpty()) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+    }
+  }
+
+  private void cleanUsingAcidDir(CompactionInfo ci, String location, long 
minOpenTxnGLB) throws Exception {
+    ValidTxnList validTxnList =
+            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenTxnGLB);
+    //save it so that getAcidState() sees it
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    /*
+     * {@code validTxnList} is capped by minOpenTxnGLB so if
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} 
sees a base/delta
+     * produced by a compactor, that means every reader that could be active 
right now see it
+     * as well.  That means if this base/delta shadows some earlier 
base/delta, the it will be
+     * used in favor of any files that it shadows.  Thus the shadowed files 
are safe to delete.
+     *
+     *
+     * The metadata about aborted writeIds (and consequently aborted txn IDs) 
cannot be deleted
+     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+     * For example given partition P1, txnid:150 starts and sees txnid:149 as 
open.
+     * Say compactor runs in txnid:160, but 149 is still open and P1 has the 
largest resolved
+     * writeId:17.  Compactor will produce base_17_c160.
+     * Suppose txnid:149 writes delta_18_18
+     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
+     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and 
perhaps corrupted) but
+     * not visible based on 'validTxnList' capped at minOpenTxn so it will not 
not be cleaned by
+     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so 
we must keep the
+     * metadata that says that 18 is aborted.
+     * In a slightly different case, whatever txn created delta_18 (and all 
other txn) may have
+     * committed by the time cleaner runs and so cleaner will indeed see 
delta_18_18 and remove
+     * it (since it has nothing but aborted data).  But we can't tell which 
actually happened
+     * in markCleaned() so make sure it doesn't delete meta above 
CG_CQ_HIGHEST_WRITE_ID.
+     *
+     * We could perhaps make cleaning of aborted and obsolete and remove all 
aborted files up
+     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta 
can be removed
+     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  
This could be
+     * useful if there is all of a sudden a flood of aborted txns.  (For 
another day).
+     */
+
+    // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
+    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, 
validTxnList);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+    Path path = new Path(location);
+    FileSystem fs = path.getFileSystem(conf);
+
+    // Collect all the files/dirs
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, 
validWriteIdList, Ref.from(false), false,
+            dirSnapshots);
+    Table table = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+    boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
+
+    List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort);
+    if (isDynPartAbort || dir.hasUncompactedAborts()) {
+      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+    }
+
+    List<Path> deleted = fsRemover.clean(new 
CleaningRequestBuilder().setLocation(location).setRunAs(ci.runAs)
+            
.setObsoleteDirs(obsoleteDirs).setPurge(true).setFullPartitionName(ci.getFullPartitionName())
+            .build());
+
+    if (deleted.size() > 0) {
+      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, 
ci.partName, dir.getObsolete(), conf,
+              txnHandler);
+    }
+
+    // Make sure there are no leftovers below the compacted watermark
+    boolean success = false;
+    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+                    ci.getFullTableName(), new long[0], new BitSet(), 
ci.highestWriteId, Long.MAX_VALUE),
+            Ref.from(false), false, dirSnapshots);
+
+    List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort), deleted);
+    if (!remained.isEmpty()) {
+      LOG.warn("{} Remained {} obsolete directories from {}. {}",
+              idWatermark(ci), remained.size(), location, 
CompactorUtil.getDebugInfo(remained));
+    } else {
+      LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + 
ci.highestWriteId + " from " + location);
+      success = true;
+    }
+    if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+      LOG.warn("No files were removed. Leaving queue entry {} in ready for 
cleaning state.", ci);
+    }
+  }
+
+  protected LockRequest createLockRequest(CompactionInfo ci, long txnId, 
LockType lockType, DataOperationType opType) {

Review Comment:
   This method has a single usage, all parameters except ci can be removed. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 847501)
    Time Spent: 9.5h  (was: 9h 20m)

> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
>                 Key: HIVE-27019
>                 URL: https://issues.apache.org/jira/browse/HIVE-27019
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Sourabh Badhya
>            Assignee: Sourabh Badhya
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> As described by the parent task - 
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from 
> relevant tables and converts it into a request entity called CleaningRequest. 
> It would also do SQL operations post cleanup (postprocess). Every type of 
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from 
> various handlers and deletes them according to the cleaning request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to