[ 
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845128&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845128
 ]

ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Feb/23 13:23
            Start Date: 13/Feb/23 13:23
    Worklog Time Spent: 10m 
      Work Description: SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1104465217


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactionCleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+
+/**
+ * A compaction based implementation of Handler.
+ * Provides implementation of finding ready to clean items, preprocessing of 
cleaning request,
+ * postprocessing of cleaning request and failure handling of cleaning request.
+ */
+public class CompactionHandler extends Handler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionHandler.class.getName());
+
+  public CompactionHandler(HiveConf conf, TxnStore txnHandler, boolean 
metricsEnabled) {
+    super(conf, txnHandler, metricsEnabled);
+  }
+
+  @Override
+  public List<CleaningRequest> findReadyToClean() throws MetaException {
+    List<CleaningRequest> cleaningRequests = new ArrayList<>();
+    long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
+            : 0;
+    long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+    List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+
+    if (!readyToClean.isEmpty()) {
+      long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+      final long cleanerWaterMark =
+              minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
+
+      LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+      // For checking which compaction can be cleaned we can use the 
minOpenTxnId
+      // However findReadyToClean will return all records that were compacted 
with old version of HMS
+      // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to 
provide minTxnIdSeenOpen
+      // to the clean method, to avoid cleaning up deltas needed for running 
queries
+      // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
+      // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
+      for (CompactionInfo ci : readyToClean) {
+        LOG.info("Starting cleaning for {}", ci);
+        try {
+          final String location = ci.getProperty("location");
+
+          Table t = null;
+          Partition p = null;
+
+          if (location == null) {
+            t = computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+            if (t == null) {
+              // The table was dropped before we got around to cleaning it.
+              LOG.info("Unable to find table {}, assuming it was dropped. {}", 
ci.getFullTableName(),
+                      idWatermark(ci));
+              txnHandler.markCleaned(ci);
+              continue;
+            }
+            if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+              // The table was marked no clean up true.
+              LOG.info("Skipping table {} clean up, as NO_CLEANUP set to 
true", ci.getFullTableName());
+              txnHandler.markCleaned(ci);
+              continue;
+            }
+            if (ci.partName != null) {
+              p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+              if (p == null) {
+                // The partition was dropped before we got around to cleaning 
it.
+                LOG.info("Unable to find partition {}, assuming it was 
dropped. {}", ci.getFullTableName(), idWatermark(ci));
+                txnHandler.markCleaned(ci);
+                continue;
+              }
+              if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+                // The partition was marked no clean up true.
+                LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to 
true", ci.getFullPartitionName());
+                txnHandler.markCleaned(ci);
+                continue;
+              }
+            }
+          }
+
+          if (t != null || ci.partName != null) {
+            String path = location == null
+                    ? CompactorUtil.resolveStorageDescriptor(t, 
p).getLocation()
+                    : location;
+            boolean dropPartition = ci.partName != null && p == null;
+            if (dropPartition) {
+              //check if partition wasn't re-created
+              if (resolvePartition(ci.dbname, ci.tableName, ci.partName) == 
null) {
+                String strIfPurge = ci.getProperty("ifPurge");
+                boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+                Path obsoletePath = new Path(path);
+                cleaningRequests.add(new CompactionCleaningRequest(path, ci, 
Collections.singletonList(obsoletePath), ifPurge,
+                        obsoletePath.getFileSystem(conf), null, true));
+                continue;
+              }
+            }
+
+            ValidTxnList validTxnList =
+                    
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
cleanerWaterMark);
+            //save it so that getAcidState() sees it
+            conf.set(ValidTxnList.VALID_TXNS_KEY, 
validTxnList.writeToString());
+            /*
+             * {@code validTxnList} is capped by minOpenTxnGLB so if
+             * {@link AcidUtils#getAcidState(Path, Configuration, 
ValidWriteIdList)} sees a base/delta
+             * produced by a compactor, that means every reader that could be 
active right now see it
+             * as well.  That means if this base/delta shadows some earlier 
base/delta, the it will be
+             * used in favor of any files that it shadows.  Thus the shadowed 
files are safe to delete.
+             *
+             *
+             * The metadata about aborted writeIds (and consequently aborted 
txn IDs) cannot be deleted
+             * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+             * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+             * For example given partition P1, txnid:150 starts and sees 
txnid:149 as open.
+             * Say compactor runs in txnid:160, but 149 is still open and P1 
has the largest resolved
+             * writeId:17.  Compactor will produce base_17_c160.
+             * Suppose txnid:149 writes delta_18_18
+             * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS 
entries
+             * up to (inclusive) writeId:17 since delta_18_18 may be on disk 
(and perhaps corrupted) but
+             * not visible based on 'validTxnList' capped at minOpenTxn so it 
will not not be cleaned by
+             * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} 
and so we must keep the
+             * metadata that says that 18 is aborted.
+             * In a slightly different case, whatever txn created delta_18 
(and all other txn) may have
+             * committed by the time cleaner runs and so cleaner will indeed 
see delta_18_18 and remove
+             * it (since it has nothing but aborted data).  But we can't tell 
which actually happened
+             * in markCleaned() so make sure it doesn't delete meta above 
CG_CQ_HIGHEST_WRITE_ID.
+             *
+             * We could perhaps make cleaning of aborted and obsolete and 
remove all aborted files up
+             * to the current Min Open Write Id, this way aborted 
TXN_COMPONENTS meta can be removed
+             * as well up to that point which may be higher than 
CQ_HIGHEST_WRITE_ID.  This could be
+             * useful if there is all of a sudden a flood of aborted txns.  
(For another day).
+             */
+
+            // Creating 'reader' list since we are interested in the set of 
'obsolete' files
+            ValidReaderWriteIdList validWriteIdList = 
getValidCleanerWriteIdList(ci, validTxnList);
+            LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+            Path loc = new Path(path);
+            FileSystem fs = loc.getFileSystem(conf);
+
+            // Collect all the files/dirs
+            Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, loc);
+            AcidDirectory dir = AcidUtils.getAcidState(fs, loc, conf, 
validWriteIdList, Ref.from(false), false,
+                    dirSnapshots);
+            Table table = computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+            boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, 
ci.partName);
+
+            List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
+            if (isDynPartAbort || dir.hasUncompactedAborts()) {
+              ci.setWriteIds(dir.hasUncompactedAborts(), 
dir.getAbortedWriteIds());
+            }
+
+            cleaningRequests.add(new CompactionCleaningRequest(path, ci, 
obsoleteDirs, true, fs, dirSnapshots, false));
+          } else {
+            String strIfPurge = ci.getProperty("ifPurge");
+            boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+            Path obsoletePath = new Path(location);
+            cleaningRequests.add(new CompactionCleaningRequest(location, ci, 
Collections.singletonList(obsoletePath), ifPurge,
+                    obsoletePath.getFileSystem(conf), null, false));
+          }
+        } catch (Exception e) {
+          LOG.warn("Cleaning request was not successful generated for : {} due 
to {}", idWatermark(ci), e.getMessage());
+          ci.errorMessage = e.getMessage();
+          if (metricsEnabled) {
+            
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+          }
+          handleCleanerAttemptFailure(ci);
+        }
+      }
+    }
+    return cleaningRequests;
+  }
+
+  @Override
+  public void beforeExecutingCleaningRequest(CleaningRequest cleaningRequest) 
throws MetaException {
+    CompactionInfo ci = ((CompactionCleaningRequest) 
cleaningRequest).getCompactionInfo();
+    txnHandler.markCleanerStart(ci);
+  }
+
+  @Override
+  public void afterExecutingCleaningRequest(CleaningRequest cleaningRequest, 
List<Path> deletedFiles) throws MetaException {
+    CompactionInfo ci = ((CompactionCleaningRequest) 
cleaningRequest).getCompactionInfo();
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
((CompactionCleaningRequest) cleaningRequest).getHdfsDirSnapshots();
+    // Make sure there are no leftovers below the compacted watermark
+    if (dirSnapshots != null) {
+      conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+      Path path = new Path(cleaningRequest.getLocation());
+      Table table;
+      boolean success = false;

Review Comment:
   Made afterExecutingCleaningRequest() return a boolean to notify FSRemover 
whether the cleanup was successful or not.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 845128)
    Time Spent: 1.5h  (was: 1h 20m)

> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
>                 Key: HIVE-27019
>                 URL: https://issues.apache.org/jira/browse/HIVE-27019
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Sourabh Badhya
>            Assignee: Sourabh Badhya
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> As described by the parent task - 
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from 
> relevant tables and converts it into a request entity called CleaningRequest. 
> It would also do SQL operations post cleanup (postprocess). Every type of 
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from 
> various handlers and deletes them according to the cleaning request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to