[ https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=847501&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847501 ]
ASF GitHub Bot logged work on HIVE-27019: ----------------------------------------- Author: ASF GitHub Bot Created on: 24/Feb/23 11:27 Start Date: 24/Feb/23 11:27 Worklog Time Spent: 10m Work Description: veghlaci05 commented on code in PR #4032: URL: https://github.com/apache/hive/pull/4032#discussion_r1116855904 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java: ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor.handler; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.metrics.PerfLogger; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.txn.compactor.CacheContainer; +import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest; +import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest.CleaningRequestBuilder; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.FSRemover; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.commons.collections4.ListUtils.subtract; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar; +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar; +import static java.util.Objects.isNull; + +/** + * A compaction based implementation of RequestHandler. + * Provides implementation of finding ready to clean items, preprocessing of cleaning request, + * postprocessing of cleaning request and failure handling of cleaning request. + */ +class CompactionHandler extends RequestHandler { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionHandler.class.getName()); + + public CompactionHandler(HiveConf conf, TxnStore txnHandler, + CacheContainer cacheContainer, boolean metricsEnabled, + FSRemover fsRemover, ExecutorService cleanerExecutor) { + super(conf, txnHandler, cacheContainer, metricsEnabled, fsRemover, cleanerExecutor); + } + + @Override + protected void processInternal() throws Exception { + long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); + long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED) + ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) + : 0; + List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); + if (!readyToClean.isEmpty()) { + long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen(); + final long cleanerWaterMark = + minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen); + + LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark); + // For checking which compaction can be cleaned we can use the minOpenTxnId + // However findReadyToClean will return all records that were compacted with old version of HMS + // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen + // to the clean method, to avoid cleaning up deltas needed for running queries + // when min_history_level is finally dropped, than every HMS will commit compaction the new way + // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead. + List<CompletableFuture<Void>> asyncTasks = new ArrayList<>(); + for (CompactionInfo ci : readyToClean) { + CompletableFuture<Void> asyncTask = CompletableFuture.runAsync( + CompactorUtil.ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled)), cleanerExecutor) + .exceptionally(t -> { + LOG.error("Error clearing {} due to :", ci.getFullPartitionName(), t); + return null; + }); + asyncTasks.add(asyncTask); + } + //Use get instead of join, so we can receive InterruptedException and shutdown gracefully + CompletableFuture.allOf(asyncTasks.toArray(new CompletableFuture[0])).get(); + } + } + + private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException { + LOG.info("Starting cleaning for {}", ci); + PerfLogger perfLogger = PerfLogger.getPerfLogger(false); + String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" + + (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null); + try { + if (metricsEnabled) { + perfLogger.perfLogBegin(CompactionHandler.class.getName(), cleanerMetric); + } + final String location = ci.getProperty("location"); + + Table t = null; + Partition p = null; + + if (isNull(location)) { + t = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); + if (isNull(t)) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(), + idWatermark(ci)); + txnHandler.markCleaned(ci); + return; + } + if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) { + // The table was marked no clean up true. + LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName()); + txnHandler.markCleaned(ci); + return; + } + if (!isNull(ci.partName)) { + p = resolvePartition(ci.dbname, ci.tableName, ci.partName); + if (isNull(p)) { + // The partition was dropped before we got around to cleaning it. + LOG.info("Unable to find partition {}, assuming it was dropped. {}", + ci.getFullPartitionName(), idWatermark(ci)); + txnHandler.markCleaned(ci); + return; + } + if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) { + // The partition was marked no clean up true. + LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName()); + txnHandler.markCleaned(ci); + return; + } + } + } + txnHandler.markCleanerStart(ci); + + if (!isNull(t) || !isNull(ci.partName)) { + String path = isNull(location) + ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation() + : location; + boolean dropPartition = !isNull(ci.partName) && isNull(p); + + //check if partition wasn't re-created + if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) { + cleanUsingLocation(ci, path, true); + } else { + cleanUsingAcidDir(ci, path, minOpenTxnGLB); + } + } else { + cleanUsingLocation(ci, location, false); + } + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci, + e.getMessage()); + ci.errorMessage = e.getMessage(); + if (metricsEnabled) { + Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc(); + } + handleCleanerAttemptFailure(ci); + } finally { + if (metricsEnabled) { + perfLogger.perfLogEnd(CompactionHandler.class.getName(), cleanerMetric); + } + } + } + + private void cleanUsingLocation(CompactionInfo ci, String path, boolean requiresLock) throws MetaException { + List<Path> deleted; + if (requiresLock) { + LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE); + LockResponse res = null; + try { + res = txnHandler.lock(lockRequest); + deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path)); + } catch (NoSuchTxnException | TxnAbortedException e) { + LOG.error("Error while trying to acquire exclusive write lock: {}", e.getMessage()); + throw new MetaException(e.getMessage()); + } finally { + if (res != null) { + try { + txnHandler.unlock(new UnlockRequest(res.getLockid())); + } catch (NoSuchLockException | TxnOpenException e) { + LOG.error("Error while trying to release exclusive write lock: {}", e.getMessage()); + } + } + } + } else { + deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path)); + } + if (!deleted.isEmpty()) { + txnHandler.markCleaned(ci); + } else { + txnHandler.clearCleanerStart(ci); + } + } + + private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxnGLB) throws Exception { + ValidTxnList validTxnList = + TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); + //save it so that getAcidState() sees it + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + /* + * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta + * produced by a compactor, that means every reader that could be active right now see it + * as well. That means if this base/delta shadows some earlier base/delta, the it will be + * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. + * + * + * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted + * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. + * See {@link TxnStore#markCleaned(CompactionInfo)} for details. + * For example given partition P1, txnid:150 starts and sees txnid:149 as open. + * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved + * writeId:17. Compactor will produce base_17_c160. + * Suppose txnid:149 writes delta_18_18 + * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries + * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but + * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by + * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the + * metadata that says that 18 is aborted. + * In a slightly different case, whatever txn created delta_18 (and all other txn) may have + * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove + * it (since it has nothing but aborted data). But we can't tell which actually happened + * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. + * + * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up + * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed + * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be + * useful if there is all of a sudden a flood of aborted txns. (For another day). + */ + + // Creating 'reader' list since we are interested in the set of 'obsolete' files + ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList); + LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); + + Path path = new Path(location); + FileSystem fs = path.getFileSystem(conf); + + // Collect all the files/dirs + Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path); + AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false, + dirSnapshots); + Table table = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); + boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName); + + List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort); + if (isDynPartAbort || dir.hasUncompactedAborts()) { + ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds()); + } + + List<Path> deleted = fsRemover.clean(new CleaningRequestBuilder().setLocation(location).setRunAs(ci.runAs) + .setObsoleteDirs(obsoleteDirs).setPurge(true).setFullPartitionName(ci.getFullPartitionName()) + .build()); + + if (deleted.size() > 0) { + AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf, + txnHandler); + } + + // Make sure there are no leftovers below the compacted watermark + boolean success = false; + conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); + dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList( + ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE), + Ref.from(false), false, dirSnapshots); + + List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted); + if (!remained.isEmpty()) { + LOG.warn("{} Remained {} obsolete directories from {}. {}", + idWatermark(ci), remained.size(), location, CompactorUtil.getDebugInfo(remained)); + } else { + LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location); + success = true; + } + if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) { + txnHandler.markCleaned(ci); + } else { + txnHandler.clearCleanerStart(ci); + LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci); + } + } + + protected LockRequest createLockRequest(CompactionInfo ci, long txnId, LockType lockType, DataOperationType opType) { Review Comment: This method has a single usage, all parameters except ci can be removed. Issue Time Tracking ------------------- Worklog Id: (was: 847501) Time Spent: 9.5h (was: 9h 20m) > Split Cleaner into separate manageable modular entities > ------------------------------------------------------- > > Key: HIVE-27019 > URL: https://issues.apache.org/jira/browse/HIVE-27019 > Project: Hive > Issue Type: Sub-task > Reporter: Sourabh Badhya > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Time Spent: 9.5h > Remaining Estimate: 0h > > As described by the parent task - > Cleaner can be divided into separate entities like - > *1) Handler* - This entity fetches the data from the metastore DB from > relevant tables and converts it into a request entity called CleaningRequest. > It would also do SQL operations post cleanup (postprocess). Every type of > cleaning request is provided by a separate handler. > *2) Filesystem remover* - This entity fetches the cleaning requests from > various handlers and deletes them according to the cleaning request. -- This message was sent by Atlassian Jira (v8.20.10#820010)