[ https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845128&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845128 ]
ASF GitHub Bot logged work on HIVE-27019: ----------------------------------------- Author: ASF GitHub Bot Created on: 13/Feb/23 13:23 Start Date: 13/Feb/23 13:23 Worklog Time Spent: 10m Work Description: SourabhBadhya commented on code in PR #4032: URL: https://github.com/apache/hive/pull/4032#discussion_r1104465217 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java: ########## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor.handler; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest; +import org.apache.hadoop.hive.ql.txn.compactor.CompactionCleaningRequest; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.commons.collections4.ListUtils.subtract; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar; + +/** + * A compaction based implementation of Handler. + * Provides implementation of finding ready to clean items, preprocessing of cleaning request, + * postprocessing of cleaning request and failure handling of cleaning request. + */ +public class CompactionHandler extends Handler { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHandler.class.getName()); + + public CompactionHandler(HiveConf conf, TxnStore txnHandler, boolean metricsEnabled) { + super(conf, txnHandler, metricsEnabled); + } + + @Override + public List<CleaningRequest> findReadyToClean() throws MetaException { + List<CleaningRequest> cleaningRequests = new ArrayList<>(); + long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED) + ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) + : 0; + long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); + List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); + + if (!readyToClean.isEmpty()) { + long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen(); + final long cleanerWaterMark = + minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen); + + LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark); + // For checking which compaction can be cleaned we can use the minOpenTxnId + // However findReadyToClean will return all records that were compacted with old version of HMS + // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen + // to the clean method, to avoid cleaning up deltas needed for running queries + // when min_history_level is finally dropped, than every HMS will commit compaction the new way + // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead. + for (CompactionInfo ci : readyToClean) { + LOG.info("Starting cleaning for {}", ci); + try { + final String location = ci.getProperty("location"); + + Table t = null; + Partition p = null; + + if (location == null) { + t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); + if (t == null) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(), + idWatermark(ci)); + txnHandler.markCleaned(ci); + continue; + } + if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) { + // The table was marked no clean up true. + LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName()); + txnHandler.markCleaned(ci); + continue; + } + if (ci.partName != null) { + p = resolvePartition(ci.dbname, ci.tableName, ci.partName); + if (p == null) { + // The partition was dropped before we got around to cleaning it. + LOG.info("Unable to find partition {}, assuming it was dropped. {}", ci.getFullTableName(), idWatermark(ci)); + txnHandler.markCleaned(ci); + continue; + } + if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) { + // The partition was marked no clean up true. + LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName()); + txnHandler.markCleaned(ci); + continue; + } + } + } + + if (t != null || ci.partName != null) { + String path = location == null + ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation() + : location; + boolean dropPartition = ci.partName != null && p == null; + if (dropPartition) { + //check if partition wasn't re-created + if (resolvePartition(ci.dbname, ci.tableName, ci.partName) == null) { + String strIfPurge = ci.getProperty("ifPurge"); + boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge")); + + Path obsoletePath = new Path(path); + cleaningRequests.add(new CompactionCleaningRequest(path, ci, Collections.singletonList(obsoletePath), ifPurge, + obsoletePath.getFileSystem(conf), null, true)); + continue; + } + } + + ValidTxnList validTxnList = + TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), cleanerWaterMark); + //save it so that getAcidState() sees it + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + /* + * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta + * produced by a compactor, that means every reader that could be active right now see it + * as well. That means if this base/delta shadows some earlier base/delta, the it will be + * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. + * + * + * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted + * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. + * See {@link TxnStore#markCleaned(CompactionInfo)} for details. + * For example given partition P1, txnid:150 starts and sees txnid:149 as open. + * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved + * writeId:17. Compactor will produce base_17_c160. + * Suppose txnid:149 writes delta_18_18 + * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries + * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but + * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by + * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the + * metadata that says that 18 is aborted. + * In a slightly different case, whatever txn created delta_18 (and all other txn) may have + * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove + * it (since it has nothing but aborted data). But we can't tell which actually happened + * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. + * + * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up + * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed + * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be + * useful if there is all of a sudden a flood of aborted txns. (For another day). + */ + + // Creating 'reader' list since we are interested in the set of 'obsolete' files + ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList); + LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); + + Path loc = new Path(path); + FileSystem fs = loc.getFileSystem(conf); + + // Collect all the files/dirs + Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, loc); + AcidDirectory dir = AcidUtils.getAcidState(fs, loc, conf, validWriteIdList, Ref.from(false), false, + dirSnapshots); + Table table = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); + boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName); + + List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort); + if (isDynPartAbort || dir.hasUncompactedAborts()) { + ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds()); + } + + cleaningRequests.add(new CompactionCleaningRequest(path, ci, obsoleteDirs, true, fs, dirSnapshots, false)); + } else { + String strIfPurge = ci.getProperty("ifPurge"); + boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge")); + + Path obsoletePath = new Path(location); + cleaningRequests.add(new CompactionCleaningRequest(location, ci, Collections.singletonList(obsoletePath), ifPurge, + obsoletePath.getFileSystem(conf), null, false)); + } + } catch (Exception e) { + LOG.warn("Cleaning request was not successful generated for : {} due to {}", idWatermark(ci), e.getMessage()); + ci.errorMessage = e.getMessage(); + if (metricsEnabled) { + Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc(); + } + handleCleanerAttemptFailure(ci); + } + } + } + return cleaningRequests; + } + + @Override + public void beforeExecutingCleaningRequest(CleaningRequest cleaningRequest) throws MetaException { + CompactionInfo ci = ((CompactionCleaningRequest) cleaningRequest).getCompactionInfo(); + txnHandler.markCleanerStart(ci); + } + + @Override + public void afterExecutingCleaningRequest(CleaningRequest cleaningRequest, List<Path> deletedFiles) throws MetaException { + CompactionInfo ci = ((CompactionCleaningRequest) cleaningRequest).getCompactionInfo(); + Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = ((CompactionCleaningRequest) cleaningRequest).getHdfsDirSnapshots(); + // Make sure there are no leftovers below the compacted watermark + if (dirSnapshots != null) { + conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); + Path path = new Path(cleaningRequest.getLocation()); + Table table; + boolean success = false; Review Comment: Made afterExecutingCleaningRequest() return a boolean to notify FSRemover whether the cleanup was successful or not. Issue Time Tracking ------------------- Worklog Id: (was: 845128) Time Spent: 1.5h (was: 1h 20m) > Split Cleaner into separate manageable modular entities > ------------------------------------------------------- > > Key: HIVE-27019 > URL: https://issues.apache.org/jira/browse/HIVE-27019 > Project: Hive > Issue Type: Sub-task > Reporter: Sourabh Badhya > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Time Spent: 1.5h > Remaining Estimate: 0h > > As described by the parent task - > Cleaner can be divided into separate entities like - > *1) Handler* - This entity fetches the data from the metastore DB from > relevant tables and converts it into a request entity called CleaningRequest. > It would also do SQL operations post cleanup (postprocess). Every type of > cleaning request is provided by a separate handler. > *2) Filesystem remover* - This entity fetches the cleaning requests from > various handlers and deletes them according to the cleaning request. -- This message was sent by Atlassian Jira (v8.20.10#820010)