[ https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845776 ]
ASF GitHub Bot logged work on HIVE-27019: ----------------------------------------- Author: ASF GitHub Bot Created on: 16/Feb/23 06:10 Start Date: 16/Feb/23 06:10 Worklog Time Spent: 10m Work Description: SourabhBadhya commented on code in PR #4032: URL: https://github.com/apache/hive/pull/4032#discussion_r1108051902 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java: ########## @@ -141,49 +93,37 @@ public void run() { new CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt)); } - long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); - - checkInterrupt(); - - List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); - - checkInterrupt(); - - if (!readyToClean.isEmpty()) { - long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen(); - final long cleanerWaterMark = - minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen); - - LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark); - List<CompletableFuture<Void>> cleanerList = new ArrayList<>(); - // For checking which compaction can be cleaned we can use the minOpenTxnId - // However findReadyToClean will return all records that were compacted with old version of HMS - // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen - // to the clean method, to avoid cleaning up deltas needed for running queries - // when min_history_level is finally dropped, than every HMS will commit compaction the new way - // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead. - for (CompactionInfo compactionInfo : readyToClean) { - - //Check for interruption before scheduling each compactionInfo and return if necessary + for (Handler handler : handlers) { + try { + List<CleaningRequest> readyToClean = handler.findReadyToClean(); checkInterrupt(); - CompletableFuture<Void> asyncJob = - CompletableFuture.runAsync( - ThrowingRunnable.unchecked(() -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)), - cleanerExecutor) - .exceptionally(t -> { - LOG.error("Error clearing {}", compactionInfo.getFullPartitionName(), t); - return null; - }); - cleanerList.add(asyncJob); + if (!readyToClean.isEmpty()) { + List<CompletableFuture<Void>> cleanerList = new ArrayList<>(); + for (CleaningRequest cr : readyToClean) { + + //Check for interruption before scheduling each cleaning request and return if necessary + checkInterrupt(); + + CompletableFuture<Void> asyncJob = CompletableFuture.runAsync( + ThrowingRunnable.unchecked(new FSRemover(handler, cr)), cleanerExecutor) + .exceptionally(t -> { + LOG.error("Error clearing: {}", cr.getFullPartitionName(), t); + return null; + }); Review Comment: Implemented it in a similar way. Done. Issue Time Tracking ------------------- Worklog Id: (was: 845776) Time Spent: 4h 40m (was: 4.5h) > Split Cleaner into separate manageable modular entities > ------------------------------------------------------- > > Key: HIVE-27019 > URL: https://issues.apache.org/jira/browse/HIVE-27019 > Project: Hive > Issue Type: Sub-task > Reporter: Sourabh Badhya > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Time Spent: 4h 40m > Remaining Estimate: 0h > > As described by the parent task - > Cleaner can be divided into separate entities like - > *1) Handler* - This entity fetches the data from the metastore DB from > relevant tables and converts it into a request entity called CleaningRequest. > It would also do SQL operations post cleanup (postprocess). Every type of > cleaning request is provided by a separate handler. > *2) Filesystem remover* - This entity fetches the cleaning requests from > various handlers and deletes them according to the cleaning request. -- This message was sent by Atlassian Jira (v8.20.10#820010)