[ https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=847743&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847743 ]
ASF GitHub Bot logged work on HIVE-27019: ----------------------------------------- Author: ASF GitHub Bot Created on: 27/Feb/23 08:00 Start Date: 27/Feb/23 08:00 Worklog Time Spent: 10m Work Description: deniskuzZ commented on code in PR #4032: URL: https://github.com/apache/hive/pull/4032#discussion_r1118391763 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java: ########## @@ -212,323 +138,9 @@ public void run() { } } - private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException { - LOG.info("Starting cleaning for " + ci); - PerfLogger perfLogger = PerfLogger.getPerfLogger(false); - String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" + - (ci.type != null ? ci.type.toString().toLowerCase() : null); - try { - if (metricsEnabled) { - perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric); - } - final String location = ci.getProperty("location"); - - Callable<Boolean> cleanUpTask; - Table t = null; - Partition p = null; - - if (location == null) { - t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci)); - if (t == null) { - // The table was dropped before we got around to cleaning it. - LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + - idWatermark(ci)); - txnHandler.markCleaned(ci); - return; - } - if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) { - // The table was marked no clean up true. - LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true"); - txnHandler.markCleaned(ci); - return; - } - if (ci.partName != null) { - p = resolvePartition(ci); - if (p == null) { - // The partition was dropped before we got around to cleaning it. - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped." + idWatermark(ci)); - txnHandler.markCleaned(ci); - return; - } - if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) { - // The partition was marked no clean up true. - LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true"); - txnHandler.markCleaned(ci); - return; - } - } - } - txnHandler.markCleanerStart(ci); - - if (t != null || ci.partName != null) { - String path = location == null - ? resolveStorageDescriptor(t, p).getLocation() - : location; - boolean dropPartition = ci.partName != null && p == null; - cleanUpTask = () -> removeFiles(path, minOpenTxnGLB, ci, dropPartition); - } else { - cleanUpTask = () -> removeFiles(location, ci); - } - - Ref<Boolean> removedFiles = Ref.from(false); - if (runJobAsSelf(ci.runAs)) { - removedFiles.value = cleanUpTask.call(); - } else { - LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); - UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, - UserGroupInformation.getLoginUser()); - try { - ugi.doAs((PrivilegedExceptionAction<Void>) () -> { - removedFiles.value = cleanUpTask.call(); - return null; - }); - } finally { - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName() + idWatermark(ci), exception); - } - } - } - if (removedFiles.value || isDynPartAbort(t, ci)) { - txnHandler.markCleaned(ci); - } else { - txnHandler.clearCleanerStart(ci); - LOG.warn("No files were removed. Leaving queue entry " + ci + " in ready for cleaning state."); - } - } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + - StringUtils.stringifyException(e)); - ci.errorMessage = e.getMessage(); - if (metricsEnabled) { - Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc(); - } - handleCleanerAttemptFailure(ci); - } finally { - if (metricsEnabled) { - perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric); - } - } - } - - private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException { - long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS); - int cleanAttempts = 0; - if (ci.retryRetention > 0) { - cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1; - } - if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) { - //Mark it as failed if the max attempt threshold is reached. - txnHandler.markFailed(ci); - } else { - //Calculate retry retention time and update record. - ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention; - txnHandler.setCleanerRetryRetentionTimeOnError(ci); - } - } - - private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList) - throws NoSuchTxnException, MetaException { - List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName)); - GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames); - request.setValidTxnList(validTxnList.writeToString()); - GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request); - // we could have no write IDs for a table if it was never written to but - // since we are in the Cleaner phase of compactions, there must have - // been some delta/base dirs - assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; - ValidReaderWriteIdList validWriteIdList = - TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); - /* - * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction - * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction - * should not touch the newer obsolete directories to not to violate the retentionTime for those. - */ - if (ci.highestWriteId < validWriteIdList.getHighWatermark()) { - validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId); - } - return validWriteIdList; - } - - private static boolean isDynPartAbort(Table t, CompactionInfo ci) { - return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> pk.size() > 0).isPresent() - && ci.partName == null; - } - - private static String idWatermark(CompactionInfo ci) { - return " id=" + ci.id; - } - - private boolean removeFiles(String location, long minOpenTxnGLB, CompactionInfo ci, boolean dropPartition) - throws Exception { - - if (dropPartition) { - LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE); - LockResponse res = null; - - try { - res = txnHandler.lock(lockRequest); - if (res.getState() == LockState.ACQUIRED) { - //check if partition wasn't re-created - if (resolvePartition(ci) == null) { - return removeFiles(location, ci); - } - } - } catch (NoSuchTxnException | TxnAbortedException e) { - LOG.error(e.getMessage()); - } finally { - if (res != null) { - try { - txnHandler.unlock(new UnlockRequest(res.getLockid())); - } catch (NoSuchLockException | TxnOpenException e) { - LOG.error(e.getMessage()); - } - } - } - } - - ValidTxnList validTxnList = - TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); - //save it so that getAcidState() sees it - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - /** - * {@code validTxnList} is capped by minOpenTxnGLB so if - * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta - * produced by a compactor, that means every reader that could be active right now see it - * as well. That means if this base/delta shadows some earlier base/delta, the it will be - * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. - * - * - * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted - * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. - * See {@link TxnStore#markCleaned(CompactionInfo)} for details. - * For example given partition P1, txnid:150 starts and sees txnid:149 as open. - * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved - * writeId:17. Compactor will produce base_17_c160. - * Suppose txnid:149 writes delta_18_18 - * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries - * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but - * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by - * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the - * metadata that says that 18 is aborted. - * In a slightly different case, whatever txn created delta_18 (and all other txn) may have - * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove - * it (since it has nothing but aborted data). But we can't tell which actually happened - * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. - * - * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up - * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed - * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be - * useful if there is all of a sudden a flood of aborted txns. (For another day). - */ - - // Creating 'reader' list since we are interested in the set of 'obsolete' files - ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList); - LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); - - return removeFiles(location, validWriteIdList, ci); - } - /** - * @return true if any files were removed - */ - private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws Exception { - Path path = new Path(location); - FileSystem fs = path.getFileSystem(conf); - - // Collect all of the files/dirs - Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path); - AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false, - dirSnapshots); - Table table = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci)); - boolean isDynPartAbort = isDynPartAbort(table, ci); - - List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort); - if (isDynPartAbort || dir.hasUncompactedAborts()) { - ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds()); - } - List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs); - if (dir.getObsolete().size() > 0) { - AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf, - txnHandler); - } - // Make sure there are no leftovers below the compacted watermark - conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); - dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList( - ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE), - Ref.from(false), false, dirSnapshots); - - List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), deleted); - if (!remained.isEmpty()) { - LOG.warn(idWatermark(ci) + " Remained " + remained.size() + - " obsolete directories from " + location + ". " + getDebugInfo(remained)); - return false; - } - LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location); - return true; - } - - private List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) { - List<Path> obsoleteDirs = dir.getObsolete(); - /** - * add anything in 'dir' that only has data from aborted transactions - no one should be - * trying to read anything in that dir (except getAcidState() that only reads the name of - * this dir itself) - * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there - * are no active txns when cleaner runs). The key is to not delete metadata about aborted - * txns with write IDs > {@link CompactionInfo#highestWriteId}. - * See {@link TxnStore#markCleaned(CompactionInfo)} - */ - obsoleteDirs.addAll(dir.getAbortedDirectories()); - if (isDynPartAbort) { - // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup. - // Including obsolete directories for partitioned tables can result in data loss. - obsoleteDirs = dir.getAbortedDirectories(); - } - return obsoleteDirs; - } - - private boolean removeFiles(String location, CompactionInfo ci) throws IOException, MetaException { - String strIfPurge = ci.getProperty("ifPurge"); - boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge")); - - Path path = new Path(location); - return !remove(location, ci, Collections.singletonList(path), ifPurge, - path.getFileSystem(conf)).isEmpty(); - } - - private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs) - throws MetaException, IOException { - List<Path> deleted = new ArrayList<>(); - if (paths.size() < 1) { - return deleted; - } - LOG.info(idWatermark(ci) + " About to remove " + paths.size() + - " obsolete directories from " + location + ". " + getDebugInfo(paths)); - boolean needCmRecycle; - try { - Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname); - needCmRecycle = ReplChangeManager.isSourceOfReplication(db); - } catch (NoSuchObjectException ex) { - // can not drop a database which is a source of replication - needCmRecycle = false; - } - for (Path dead : paths) { - LOG.debug("Going to delete path " + dead.toString()); - if (needCmRecycle) { - replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge); - } - if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) { - deleted.add(dead); - } - } - return deleted; - } - - private String getDebugInfo(List<Path> paths) { - return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']'; + @VisibleForTesting + public void setCleaningRequestHandlers(List<RequestHandler> requestHandlers) { Review Comment: simply `setRequestHandlers` Issue Time Tracking ------------------- Worklog Id: (was: 847743) Time Spent: 11h 20m (was: 11h 10m) > Split Cleaner into separate manageable modular entities > ------------------------------------------------------- > > Key: HIVE-27019 > URL: https://issues.apache.org/jira/browse/HIVE-27019 > Project: Hive > Issue Type: Sub-task > Reporter: Sourabh Badhya > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Time Spent: 11h 20m > Remaining Estimate: 0h > > As described by the parent task - > Cleaner can be divided into separate entities like - > *1) Handler* - This entity fetches the data from the metastore DB from > relevant tables and converts it into a request entity called CleaningRequest. > It would also do SQL operations post cleanup (postprocess). Every type of > cleaning request is provided by a separate handler. > *2) Filesystem remover* - This entity fetches the cleaning requests from > various handlers and deletes them according to the cleaning request. -- This message was sent by Atlassian Jira (v8.20.10#820010)